diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsLimiter.java index 5969b991cb50e..15573b59fb4ed 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsLimiter.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsLimiter.java @@ -33,10 +33,21 @@ public class PendingReadsLimiter { .help("Estimated number of bytes for pending reads from storage") .register(); + private static final Gauge PULSAR_ML_PENDING_READS_AVAILABLE_BUFFER_SIZE = Gauge + .build() + .name("pulsar_ml_pending_reads_available_buffer_size") + .help("Available space on the pending reads buffer") + .register(); + private final long maxPendingReadsBufferSize; private long remainingPendingRequestsBytes; public PendingReadsLimiter(long maxPendingReadsBufferSize) { + if (maxPendingReadsBufferSize <= 0) { + // set it to -1 in order to show in the metrics that the metric is not available + PULSAR_ML_PENDING_READS_BUFFER_SIZE.set(-1); + PULSAR_ML_PENDING_READS_AVAILABLE_BUFFER_SIZE.set(-1); + } this.maxPendingReadsBufferSize = maxPendingReadsBufferSize; this.remainingPendingRequestsBytes = maxPendingReadsBufferSize; } @@ -69,9 +80,8 @@ Handle acquire(long permits, Handle current) { if (remainingPendingRequestsBytes == 0) { return new Handle(0, false, 1, System.currentTimeMillis()); } - long needed = permits; - if (remainingPendingRequestsBytes >= needed) { - remainingPendingRequestsBytes -= needed; + if (remainingPendingRequestsBytes >= permits) { + remainingPendingRequestsBytes -= permits; return new Handle(permits, true, 1, System.currentTimeMillis()); } else { long possible = remainingPendingRequestsBytes; @@ -116,6 +126,7 @@ void release(Handle handle) { private synchronized void updateMetrics() { PULSAR_ML_PENDING_READS_BUFFER_SIZE.set(maxPendingReadsBufferSize - remainingPendingRequestsBytes); + PULSAR_ML_PENDING_READS_AVAILABLE_BUFFER_SIZE.set(remainingPendingRequestsBytes); } public boolean isDisabled() {