Skip to content

Commit

Permalink
Revert Free events handling on waitForCleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Sep 26, 2024
1 parent 8068f1d commit 09f40ff
Showing 1 changed file with 1 addition and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1395,10 +1395,8 @@ private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanDa

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
long started = System.currentTimeMillis();
final var futures = new HashMap<String, CompletableFuture<Void>>();
while (System.currentTimeMillis() - started < maxWaitTimeInMillis) {
boolean cleaned = true;
futures.clear();
for (var etr : tableview.entrySet()) {
var serviceUnit = etr.getKey();
var data = etr.getValue();
Expand All @@ -1407,9 +1405,7 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
continue;
}

if (data.state() == Free) {
futures.put(serviceUnit, handleFreeEvent(serviceUnit, data));
} else if (data.state() == Owned && broker.equals(data.dstBroker())) {
if (data.state() == Owned && broker.equals(data.dstBroker())) {
cleaned = false;
break;
}
Expand All @@ -1430,18 +1426,6 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
}
}
}
var waitTimeMs = started + maxWaitTimeInMillis - System.currentTimeMillis();
if (waitTimeMs < 0) {
waitTimeMs = 0;
}
try {
FutureUtil.waitForAll(futures.values()).get(waitTimeMs, MILLISECONDS);
} catch (ExecutionException e) {
log.error("Failed to tombstone {}", futures.keySet(), e.getCause());
} catch (TimeoutException __) {
log.warn("Failed to tombstone {} in {} ms", futures.keySet(), waitTimeMs);
} catch (InterruptedException ignored) {
}
log.info("Finished cleanup waiting for orphan broker:{}. Elapsed {} ms", brokerId,
System.currentTimeMillis() - started);
}
Expand Down

0 comments on commit 09f40ff

Please sign in to comment.