Skip to content

Commit

Permalink
Merge pull request #16229 from camunda/bcan-backport-15784-to-stable/8.4
Browse files Browse the repository at this point in the history
[Backport stable/8.4] Fix: Zeebe gateway fails to stream out activated jobs by not respecting the max message size #15784
  • Loading branch information
berkaycanbc authored Feb 5, 2024
2 parents a5e4a7e + 450075b commit 27ba208
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 16 deletions.
8 changes: 6 additions & 2 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,16 @@ private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerCl
if (gatewayCfg.getLongPolling().isEnabled()) {
return buildLongPollingHandler(brokerClient);
} else {
return new RoundRobinActivateJobsHandler(brokerClient);
return new RoundRobinActivateJobsHandler(
brokerClient, gatewayCfg.getNetwork().getMaxMessageSize().toBytes());
}
}

private LongPollingActivateJobsHandler buildLongPollingHandler(final BrokerClient brokerClient) {
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
return LongPollingActivateJobsHandler.newBuilder()
.setBrokerClient(brokerClient)
.setMaxMessageSize(gatewayCfg.getNetwork().getMaxMessageSize().toBytes())
.build();
}

private ServerServiceDefinition applyInterceptors(final BindableService service) {
Expand Down
58 changes: 52 additions & 6 deletions gateway/src/main/java/io/camunda/zeebe/gateway/ResponseMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import io.camunda.zeebe.protocol.impl.record.value.signal.SignalRecord;
import io.camunda.zeebe.protocol.impl.record.value.variable.VariableDocumentRecord;
import io.camunda.zeebe.protocol.record.value.EvaluatedDecisionValue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.agrona.DirectBuffer;

public final class ResponseMapper {
Expand Down Expand Up @@ -276,22 +278,63 @@ public static SetVariablesResponse toSetVariablesResponse(
return SetVariablesResponse.newBuilder().setKey(key).build();
}

public static ActivateJobsResponse toActivateJobsResponse(
final long key, final JobBatchRecord brokerResponse) {
final ActivateJobsResponse.Builder responseBuilder = ActivateJobsResponse.newBuilder();

/**
* While converting the broker response to the gRPC response, the response size is checked. If the
* response size exceeds the maximum response size, exceeding jobs are added to the list of
* exceeding jobs to be reactivated.
*
* <p>This is because the jobs returned from the broker is in MessagePack format and while
* converting them to gRPC response the size of the response may increase (e.g. we do JSON and
* String conversions see: {@link #toActivatedJob(long, JobRecord)}). That will cause the response
* size to exceed the maximum response size allowed by the gateway and the gateway will log a
* Stream Error indicating that streaming out the activated jobs failed.
*
* <p>If we do not respect the actual max response size, Zeebe Java Client rejects the response
* containing the activated jobs and the client cancels the channel/stream/connection as well.
* Leaving failed jobs non-activatable until their configured timeout.
*
* @param key the key of the request
* @param brokerResponse the broker response
* @param maxResponseSize the maximum size of the response
* @return a pair of the response and a list of jobs that could not be included in the response
* because the response size exceeded the maximum response size
*/
public static JobActivationResult toActivateJobsResponse(
final long key, final JobBatchRecord brokerResponse, final long maxResponseSize) {
final Iterator<LongValue> jobKeys = brokerResponse.jobKeys().iterator();
final Iterator<JobRecord> jobs = brokerResponse.jobs().iterator();

long currentResponseSize = 0L;
final List<ActivatedJob> sizeExceedingJobs = new ArrayList<>();
final List<ActivatedJob> responseJobs = new ArrayList<>();

while (jobKeys.hasNext() && jobs.hasNext()) {
final LongValue jobKey = jobKeys.next();
final JobRecord job = jobs.next();
final ActivatedJob activatedJob = toActivatedJob(jobKey.getValue(), job);

responseBuilder.addJobs(activatedJob);
final int activatedJobSize = activatedJob.getSerializedSize();
if (currentResponseSize + activatedJobSize <= maxResponseSize) {
responseJobs.add(activatedJob);
currentResponseSize += activatedJobSize;
} else {
sizeExceedingJobs.add(activatedJob);
}
}

return responseBuilder.build();
ActivateJobsResponse response =
ActivateJobsResponse.newBuilder().addAllJobs(responseJobs).build();
// Response size can still exceed the maximum response size because of the metadata added on
// building the response. Therefore, we check the response size again and if the response size
// is still exceeding the maximum response size, we remove the last added job from the response
// and add it to the list of jobs to be reactivated.
// We do this until the response size is below the maximum response size.
while (!responseJobs.isEmpty() && response.getSerializedSize() > maxResponseSize) {
sizeExceedingJobs.add(responseJobs.removeLast());
response = ActivateJobsResponse.newBuilder().addAllJobs(responseJobs).build();
}

return new JobActivationResult(response, sizeExceedingJobs);
}

public static ActivatedJob toActivatedJob(
Expand Down Expand Up @@ -353,6 +396,9 @@ private static String bufferAsJson(final DirectBuffer customHeaders) {
return MsgPackConverter.convertToJson(bufferAsArray(customHeaders));
}

public record JobActivationResult(
ActivateJobsResponse activateJobsResponse, List<ActivatedJob> jobsToDefer) {}

@FunctionalInterface
public interface BrokerResponseMapper<BrokerResponseDto, GrpcResponseT> {
GrpcResponseT apply(long key, BrokerResponseDto responseDto);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ public final class LongPollingActivateJobsHandler implements ActivateJobsHandler

private LongPollingActivateJobsHandler(
final BrokerClient brokerClient,
final long maxMessageSize,
final long longPollingTimeout,
final long probeTimeoutMillis,
final int failedAttemptThreshold) {
this.brokerClient = brokerClient;
activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient, maxMessageSize);
this.longPollingTimeout = Duration.ofMillis(longPollingTimeout);
this.probeTimeoutMillis = probeTimeoutMillis;
this.failedAttemptThreshold = failedAttemptThreshold;
Expand Down Expand Up @@ -375,6 +376,7 @@ public static class Builder {
private static final int EMPTY_RESPONSE_THRESHOLD = 3;

private BrokerClient brokerClient;
private long maxMessageSize;
private long longPollingTimeout = DEFAULT_LONG_POLLING_TIMEOUT;
private long probeTimeoutMillis = DEFAULT_PROBE_TIMEOUT;
private int minEmptyResponses = EMPTY_RESPONSE_THRESHOLD;
Expand All @@ -384,6 +386,11 @@ public Builder setBrokerClient(final BrokerClient brokerClient) {
return this;
}

public Builder setMaxMessageSize(final long maxMessageSize) {
this.maxMessageSize = maxMessageSize;
return this;
}

public Builder setLongPollingTimeout(final long longPollingTimeout) {
this.longPollingTimeout = longPollingTimeout;
return this;
Expand All @@ -402,7 +409,7 @@ public Builder setMinEmptyResponses(final int minEmptyResponses) {
public LongPollingActivateJobsHandler build() {
Objects.requireNonNull(brokerClient, "brokerClient");
return new LongPollingActivateJobsHandler(
brokerClient, longPollingTimeout, probeTimeoutMillis, minEmptyResponses);
brokerClient, maxMessageSize, longPollingTimeout, probeTimeoutMillis, minEmptyResponses);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.rpc.Status;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.ResponseMapper.JobActivationResult;
import io.camunda.zeebe.gateway.cmd.BrokerErrorException;
import io.camunda.zeebe.gateway.cmd.BrokerRejectionException;
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
Expand Down Expand Up @@ -44,17 +45,21 @@ public final class RoundRobinActivateJobsHandler implements ActivateJobsHandler
private static final String ACTIVATE_JOB_NOT_SENT_MSG = "Failed to send activated jobs to client";
private static final String ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON =
ACTIVATE_JOB_NOT_SENT_MSG + ", failed with: %s";
private static final String MAX_MESSAGE_SIZE_EXCEEDED_MSG =
"the response is bigger than the maximum allowed message size %d";

private final Map<String, RequestDispatchStrategy> jobTypeToNextPartitionId =
new ConcurrentHashMap<>();
private final BrokerClient brokerClient;
private final BrokerTopologyManager topologyManager;
private final long maxMessageSize;

private ActorControl actor;

public RoundRobinActivateJobsHandler(final BrokerClient brokerClient) {
public RoundRobinActivateJobsHandler(final BrokerClient brokerClient, final long maxMessageSize) {
this.brokerClient = brokerClient;
topologyManager = brokerClient.getTopologyManager();
this.maxMessageSize = maxMessageSize;
}

@Override
Expand Down Expand Up @@ -152,11 +157,23 @@ private void handleResponseSuccess(
actor.run(
() -> {
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final JobActivationResult jobActivationResult =
ResponseMapper.toActivateJobsResponse(
brokerResponse.getKey(), response, maxMessageSize);

final List<ActivatedJob> jobsToDefer = jobActivationResult.jobsToDefer();
if (!jobsToDefer.isEmpty()) {
final var jobKeys = jobsToDefer.stream().map(ActivatedJob::getKey).toList();
final var jobType = request.getType();
final var reason = String.format(MAX_MESSAGE_SIZE_EXCEEDED_MSG, maxMessageSize);

logResponseNotSent(jobType, jobKeys, reason);
reactivateJobs(jobsToDefer, reason);
}

final ActivateJobsResponse grpcResponse = jobActivationResult.activateJobsResponse();
final var jobsCount = grpcResponse.getJobsCount();
final var jobsActivated = jobsCount > 0;

if (jobsActivated) {
final var result = request.tryToSendActivatedJobs(grpcResponse);
final var responseWasSent = result.getOrElse(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,16 @@ private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerCl
if (config.getLongPolling().isEnabled()) {
return buildLongPollingHandler(brokerClient);
} else {
return new RoundRobinActivateJobsHandler(brokerClient);
return new RoundRobinActivateJobsHandler(
brokerClient, config.getNetwork().getMaxMessageSize().toBytes());
}
}

private LongPollingActivateJobsHandler buildLongPollingHandler(final BrokerClient brokerClient) {
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
return LongPollingActivateJobsHandler.newBuilder()
.setBrokerClient(brokerClient)
.setMaxMessageSize(config.getNetwork().getMaxMessageSize().toBytes())
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.springframework.util.unit.DataSize;

public final class LongPollingActivateJobsTest {

Expand All @@ -73,6 +74,7 @@ public final class LongPollingActivateJobsTest {
private static final long PROBE_TIMEOUT = 20000;
private static final int FAILED_RESPONSE_THRESHOLD = 3;
private static final int MAX_JOBS_TO_ACTIVATE = 2;
private static final long MAX_MESSAGE_SIZE = DataSize.ofMegabytes(4).toBytes();
private final ControlledActorClock actorClock = new ControlledActorClock();
@Rule public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule(actorClock);
private LongPollingActivateJobsHandler handler;
Expand All @@ -87,6 +89,7 @@ public void setup() {
handler =
LongPollingActivateJobsHandler.newBuilder()
.setBrokerClient(brokerClient)
.setMaxMessageSize(MAX_MESSAGE_SIZE)
.setLongPollingTimeout(LONG_POLLING_TIMEOUT)
.setProbeTimeoutMillis(PROBE_TIMEOUT)
.setMinEmptyResponses(FAILED_RESPONSE_THRESHOLD)
Expand Down Expand Up @@ -246,6 +249,7 @@ public void shouldProbeIfNoNotificationReceived() throws Exception {
handler =
LongPollingActivateJobsHandler.newBuilder()
.setBrokerClient(brokerClient)
.setMaxMessageSize(MAX_MESSAGE_SIZE)
.setLongPollingTimeout(20000)
.setProbeTimeoutMillis(probeTimeout)
.build();
Expand All @@ -270,6 +274,7 @@ public void shouldProbeNextRequestWhenBlockedRequestsTimedOut() throws Exception
handler =
LongPollingActivateJobsHandler.newBuilder()
.setBrokerClient(brokerClient)
.setMaxMessageSize(MAX_MESSAGE_SIZE)
.setLongPollingTimeout(longPollingTimeout)
.setProbeTimeoutMillis(probeTimeout)
.build();
Expand Down
Loading

0 comments on commit 27ba208

Please sign in to comment.