Skip to content

Commit

Permalink
show record topic in slow-work warning message
Browse files Browse the repository at this point in the history
  • Loading branch information
ghmulti authored and astubbs committed Jul 27, 2021
1 parent 7458405 commit faa40c2
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
var staleWorkToRemove = new ArrayList<WorkContainer<K, V>>();

var slowWorkCount = 0;
var slowWorkTopics = new HashSet<String>();

//
for (var shard : it) {
Expand Down Expand Up @@ -306,6 +307,7 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
String msg = "Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.";
if (toSeconds(timeInFlight) > toSeconds(thresholdForTimeSpentInQueueWarning)) {
slowWorkCount++;
slowWorkTopics.add(workContainer.getCr().topic());
log.trace("Work has spent over " + thresholdForTimeSpentInQueueWarning + " in queue! "
+ msg, workContainer, delayHasPassed, workContainer.isNotInFlight(), hasNotSucceededAlready, timeInFlight);
} else {
Expand All @@ -330,9 +332,8 @@ public List<WorkContainer<K, V>> maybeGetWork(int requestedMaxWorkToRetrieve) {
if (slowWorkCount > 0) {
final int finalSlowWorkCount = slowWorkCount;
slowWarningRateLimit.performIfNotLimited(() -> log.warn("Warning: {} records in the queue have been " +
"waiting longer than {}.",
finalSlowWorkCount, toSeconds(thresholdForTimeSpentInQueueWarning)));

"waiting longer than {}s for following topics {}.",
finalSlowWorkCount, toSeconds(thresholdForTimeSpentInQueueWarning), slowWorkTopics));
}

// remove found stale work outside of loop
Expand Down

0 comments on commit faa40c2

Please sign in to comment.