Skip to content

Commit

Permalink
refactor: submit actor jobs to activate jobs in round robin handler
Browse files Browse the repository at this point in the history
(cherry picked from commit 9f0b9aa)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Mar 22, 2022
1 parent 389e330 commit bd03543
Showing 1 changed file with 53 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,34 @@ private void activateJobs(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState requestState,
final ResponseObserverDelegate delegate) {

if (requestState.shouldActivateJobs()) {
final var brokerRequest = request.getRequest();
final var partitionId = requestState.getNextPartition();
final var remainingAmount = requestState.getRemainingAmount();

// partitions to check and jobs to activate left
brokerRequest.setPartitionId(partitionId);
brokerRequest.setMaxJobsToActivate(remainingAmount);

brokerClient
.sendRequest(brokerRequest)
.whenComplete(handleBrokerResponse(request, requestState, delegate));

} else {
// enough jobs activated or no more partitions left to check
final var remainingAmount = requestState.getRemainingAmount();
final var resourceExhaustedWasPresent = requestState.wasResourceExhaustedPresent();
delegate.onCompleted(remainingAmount, resourceExhaustedWasPresent);
}
actor.run(
() -> {
if (requestState.shouldActivateJobs()) {
final var brokerRequest = request.getRequest();
final var partitionId = requestState.getNextPartition();
final var remainingAmount = requestState.getRemainingAmount();

// partitions to check and jobs to activate left
brokerRequest.setPartitionId(partitionId);
brokerRequest.setMaxJobsToActivate(remainingAmount);

brokerClient
.sendRequest(brokerRequest)
.whenComplete(handleBrokerResponse(request, requestState, delegate));

} else {
// enough jobs activated or no more partitions left to check
final var remainingAmount = requestState.getRemainingAmount();
final var resourceExhaustedWasPresent = requestState.wasResourceExhaustedPresent();
delegate.onCompleted(remainingAmount, resourceExhaustedWasPresent);
}
});
}

private BiConsumer<BrokerResponse<JobBatchRecord>, Throwable> handleBrokerResponse(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState requestState,
final ResponseObserverDelegate delegate) {

return (brokerResponse, error) -> {
if (error == null) {
handleResponseSuccess(request, requestState, delegate, brokerResponse);
Expand All @@ -132,38 +133,44 @@ private void handleResponseSuccess(
final InflightActivateJobsRequestState requestState,
final ResponseObserverDelegate delegate,
final BrokerResponse<JobBatchRecord> brokerResponse) {
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final int jobsCount = grpcResponse.getJobsCount();
if (jobsCount > 0) {
delegate.onResponse(grpcResponse);
}

final var remainingJobsToActivate = requestState.getRemainingAmount() - jobsCount;
final var shouldPollCurrentPartitionAgain = response.getTruncated();

requestState.setRemainingAmount(remainingJobsToActivate);
requestState.setPollPrevPartition(shouldPollCurrentPartitionAgain);
activateJobs(request, requestState, delegate);
actor.run(
() -> {
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final int jobsCount = grpcResponse.getJobsCount();
if (jobsCount > 0) {
delegate.onResponse(grpcResponse);
}

final var remainingJobsToActivate = requestState.getRemainingAmount() - jobsCount;
final var shouldPollCurrentPartitionAgain = response.getTruncated();

requestState.setRemainingAmount(remainingJobsToActivate);
requestState.setPollPrevPartition(shouldPollCurrentPartitionAgain);
activateJobs(request, requestState, delegate);
});
}

private void handleResponseError(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState state,
final ResponseObserverDelegate delegate,
final Throwable error) {
final var wasResourceExhausted = wasResourceExhausted(error);
if (isRejection(error)) {
delegate.onError(error);
return;
} else if (!wasResourceExhausted) {
logErrorResponse(state.getCurrentPartition(), request.getType(), error);
}

state.setResourceExhaustedWasPresent(wasResourceExhausted);
state.setPollPrevPartition(false);
activateJobs(request, state, delegate);
actor.run(
() -> {
final var wasResourceExhausted = wasResourceExhausted(error);
if (isRejection(error)) {
delegate.onError(error);
return;
} else if (!wasResourceExhausted) {
logErrorResponse(state.getCurrentPartition(), request.getType(), error);
}

state.setResourceExhaustedWasPresent(wasResourceExhausted);
state.setPollPrevPartition(false);
activateJobs(request, state, delegate);
});
}

private boolean isRejection(final Throwable error) {
Expand Down

0 comments on commit bd03543

Please sign in to comment.