Skip to content

Commit

Permalink
[fix] [broker] Remove blocking calls from Subscription.getStats (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored and hanmz committed Feb 12, 2025
1 parent 180aa16 commit da3d918
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1200,7 +1202,26 @@ public long estimateBacklogSize() {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}

/**
* @deprecated please call {@link #getStatsAsync(GetStatsOptions)}.
*/
@Deprecated
public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
// So far, there is no case hits this check.
if (getStatsOptions.isGetEarliestTimeInBacklog()) {
throw new IllegalArgumentException("Calling the sync method subscription.getStats with"
+ " getEarliestTimeInBacklog, it may encountered a deadlock error.");
}
// The method "getStatsAsync" will be a sync method if the param "isGetEarliestTimeInBacklog" is false.
try {
return getStatsAsync(getStatsOptions).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// This error will never occur.
throw new RuntimeException(e);
}
}

public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(GetStatsOptions getStatsOptions) {
SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Expand Down Expand Up @@ -1273,21 +1294,6 @@ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
} else {
subStats.backlogSize = -1;
}
if (getStatsOptions.isGetEarliestTimeInBacklog()) {
if (subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
Position markDeletedPosition = cursor.getMarkDeletedPosition();
long result = 0;
try {
result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
} catch (InterruptedException | ExecutionException e) {
result = -1;
}
subStats.earliestMsgPublishTimeInBacklog = result;
} else {
subStats.earliestMsgPublishTimeInBacklog = -1;
}
}
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
Expand Down Expand Up @@ -1329,7 +1335,20 @@ public SubscriptionStatsImpl getStats(GetStatsOptions getStatsOptions) {
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
return subStats;
if (!getStatsOptions.isGetEarliestTimeInBacklog()) {
return CompletableFuture.completedFuture(subStats);
}
if (subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
Position markDeletedPosition = cursor.getMarkDeletedPosition();
return managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).thenApply(v -> {
subStats.earliestMsgPublishTimeInBacklog = v;
return subStats;
});
} else {
subStats.earliestMsgPublishTimeInBacklog = -1;
return CompletableFuture.completedFuture(subStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2584,7 +2584,6 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
@Override
public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions getStatsOptions) {

CompletableFuture<TopicStatsImpl> statsFuture = new CompletableFuture<>();
TopicStatsImpl stats = new TopicStatsImpl();

ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats = new ObjectObjectHashMap<>();
Expand Down Expand Up @@ -2617,32 +2616,6 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();

subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats = subscription.getStats(getStatsOptions);

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes += subStats.delayedMessageIndexSizeInBytes;

subStats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
stats.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new TopicMetricBean());
topicMetricBean.name = v.name;
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
});

if (isSystemCursor(name) || name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
stats.bytesOutInternalCounter += subStats.bytesOutCounter;
}
});

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.computeStats();

Expand Down Expand Up @@ -2692,21 +2665,52 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
return compactionRecord;
});

if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize != 0) {
ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) -> {
if (e != null) {
log.error("[{}] Failed to get earliest message publish time in backlog", topic, e);
statsFuture.completeExceptionally(e);
} else {
stats.earliestMsgPublishTimeInBacklogs = earliestTime;
statsFuture.complete(stats);
}
});
} else {
statsFuture.complete(stats);
}
Map<String, CompletableFuture<SubscriptionStatsImpl>> subscriptionFutures = new HashMap<>();
subscriptions.forEach((name, subscription) -> {
subscriptionFutures.put(name, subscription.getStatsAsync(getStatsOptions));
});
return FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e : subscriptionFutures.entrySet()) {
String name = e.getKey();
SubscriptionStatsImpl subStats = e.getValue().join();
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes += subStats.delayedMessageIndexSizeInBytes;

subStats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
stats.bucketDelayedIndexStats.computeIfAbsent(k, ignore2 -> new TopicMetricBean());
topicMetricBean.name = v.name;
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
});

return statsFuture;
if (isSystemCursor(name) || name.startsWith(SystemTopicNames.SYSTEM_READER_PREFIX)) {
stats.bytesOutInternalCounter += subStats.bytesOutCounter;
}
}
if (getStatsOptions.isGetEarliestTimeInBacklog() && stats.backlogSize != 0) {
CompletableFuture finalRes = ledger.getEarliestMessagePublishTimeInBacklog()
.thenApply((earliestTime) -> {
stats.earliestMsgPublishTimeInBacklogs = earliestTime;
return stats;
});
// print error log.
finalRes.exceptionally(ex -> {
log.error("[{}] Failed to get earliest message publish time in backlog", topic, ex);
return null;
});
return finalRes;
} else {
return CompletableFuture.completedFuture(stats);
}
});
}

private Optional<CompactorMXBean> getCompactorMXBean() {
Expand Down

0 comments on commit da3d918

Please sign in to comment.