Skip to content

Commit

Permalink
fix issue #787 for cannot close and exit properly when rebalancing st…
Browse files Browse the repository at this point in the history
…orm (#789)

* fix issue #787 for cannot close and exit properly when rebalancing storm

* update changelog

* address comments
  • Loading branch information
sangreal authored Jun 6, 2024
1 parent 1962472 commit ba169c1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ endif::[]

=== Fixes

* fix: fix issue for cannot close and exit properly when re-balancing storm (#787)
* fix: Support for PCRetriableException in ReactorProcessor (#733)
* fix: NullPointerException on partitions revoked (#757)
* fix: remove lingeringOnCommitWouldBeBeneficial and unused imports (#732)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public static <K, V> ControllerEventMessage<K, V> of(WorkContainer<K, V> work) {
*/
private final AtomicBoolean currentlyPollingWorkCompleteMailBox = new AtomicBoolean();

/**
* Indicates state of waiting while in-flight messages complete processing on shutdown.
* Used to prevent control thread interrupt due to wakeup logic on rebalances
*/
private final AtomicBoolean awaitingInflightProcessingCompletionOnShutdown = new AtomicBoolean();

private final OffsetCommitter committer;

/**
Expand Down Expand Up @@ -639,6 +645,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
}

log.debug("Awaiting worker pool termination...");
awaitingInflightProcessingCompletionOnShutdown.getAndSet(true);
boolean awaitingInflightCompletion = true;
while (awaitingInflightCompletion) {
log.debug("Still awaiting completion of inflight work");
Expand All @@ -657,6 +664,8 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
awaitingInflightCompletion = true;
}
}
awaitingInflightProcessingCompletionOnShutdown.getAndSet(false);

if (workerThreadPool.get().getActiveCount() > 0) {
log.warn("Clean execution pool termination failed - some threads still active despite await and interrupt - is user function swallowing interrupted exception? Threads still not done count: {}", workerThreadPool.get().getActiveCount());
}
Expand Down Expand Up @@ -1401,7 +1410,8 @@ public void registerWork(EpochAndRecordsMap<K, V> polledRecords) {
*/
public void notifySomethingToDo() {
boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false);
if (noTransactionInProgress) {
// do not interrupt while workerThreadPool is draining submitted / inflight tasks
if (noTransactionInProgress && !awaitingInflightProcessingCompletionOnShutdown.get()) {
log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
interruptControlThread();
} else {
Expand Down

0 comments on commit ba169c1

Please sign in to comment.