Skip to content

Commit

Permalink
refactor(stream-platform): remove unnecessary single method call
Browse files Browse the repository at this point in the history
(cherry picked from commit a436243)
  • Loading branch information
berkaycanbc authored and deepthidevaki committed Feb 26, 2024
1 parent 01cef5b commit 7c8039b
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
*
* +------------------+ +--------------------+
* | | | | exception
* | readNextRecord() |----------->| processCommand() |------------------+
* | tryToReadNextRecord() |----------->| processCommand() |------------------+
* | | | | v
* +------------------+ +--------------------+ +---------------+
* ^ | | |------+
Expand Down Expand Up @@ -201,14 +201,10 @@ public ProcessingStateMachine(
private void skipRecord() {
notifySkippedListener(currentRecord);
markProcessingCompleted();
actor.submit(this::readNextRecord);
actor.submit(this::tryToReadNextRecord);
metrics.eventSkipped();
}

void readNextRecord() {
tryToReadNextRecord();
}

void markProcessingCompleted() {
inProcessing = false;
if (onErrorRetries > 0) {
Expand All @@ -217,7 +213,7 @@ void markProcessingCompleted() {
}
}

private void tryToReadNextRecord() {
void tryToReadNextRecord() {
final var hasNext = logStreamReader.hasNext();

if (currentRecord != null) {
Expand Down Expand Up @@ -700,7 +696,7 @@ private void executeSideEffects() {

// continue with next record
markProcessingCompleted();
actor.submit(this::readNextRecord);
actor.submit(this::tryToReadNextRecord);
});
}

Expand Down Expand Up @@ -753,7 +749,7 @@ public void startProcessing(final LastProcessingPositions lastProcessingPosition
lastWrittenPosition = lastProcessingPositions.getLastWrittenPosition();
}

actor.submit(this::readNextRecord);
actor.submit(this::tryToReadNextRecord);
}

private record BatchProcessingStepResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public void resumeProcessing() {
lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onResumed);
streamProcessorContext.streamProcessorPhase(Phase.PROCESSING);
if (processingStateMachine != null) {
actor.submit(processingStateMachine::readNextRecord);
actor.submit(processingStateMachine::tryToReadNextRecord);
}
LOG.debug("Resumed processing for partition {}", partitionId);
}
Expand All @@ -573,7 +573,7 @@ public void resumeProcessing() {

@Override
public void onRecordAvailable() {
actor.run(processingStateMachine::readNextRecord);
actor.run(processingStateMachine::tryToReadNextRecord);
}

private static final class AsyncProcessingScheduleServiceActor extends Actor {
Expand Down

0 comments on commit 7c8039b

Please sign in to comment.