Skip to content

Commit

Permalink
PendingReadsLimiter - address some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Oct 14, 2022
1 parent 2f07dda commit 88c9bc4
Showing 1 changed file with 14 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,21 @@ public class PendingReadsLimiter {
.help("Estimated number of bytes for pending reads from storage")
.register();

private static final Gauge PULSAR_ML_PENDING_READS_AVAILABLE_BUFFER_SIZE = Gauge
.build()
.name("pulsar_ml_pending_reads_available_buffer_size")
.help("Available space on the pending reads buffer")
.register();

private final long maxPendingReadsBufferSize;
private long remainingPendingRequestsBytes;

public PendingReadsLimiter(long maxPendingReadsBufferSize) {
if (maxPendingReadsBufferSize <= 0) {
// set it to -1 in order to show in the metrics that the metric is not available
PULSAR_ML_PENDING_READS_BUFFER_SIZE.set(-1);
PULSAR_ML_PENDING_READS_AVAILABLE_BUFFER_SIZE.set(-1);
}
this.maxPendingReadsBufferSize = maxPendingReadsBufferSize;
this.remainingPendingRequestsBytes = maxPendingReadsBufferSize;
}
Expand Down Expand Up @@ -69,9 +80,8 @@ Handle acquire(long permits, Handle current) {
if (remainingPendingRequestsBytes == 0) {
return new Handle(0, false, 1, System.currentTimeMillis());
}
long needed = permits;
if (remainingPendingRequestsBytes >= needed) {
remainingPendingRequestsBytes -= needed;
if (remainingPendingRequestsBytes >= permits) {
remainingPendingRequestsBytes -= permits;
return new Handle(permits, true, 1, System.currentTimeMillis());
} else {
long possible = remainingPendingRequestsBytes;
Expand Down Expand Up @@ -116,6 +126,7 @@ void release(Handle handle) {

private synchronized void updateMetrics() {
PULSAR_ML_PENDING_READS_BUFFER_SIZE.set(maxPendingReadsBufferSize - remainingPendingRequestsBytes);
PULSAR_ML_PENDING_READS_AVAILABLE_BUFFER_SIZE.set(remainingPendingRequestsBytes);
}

public boolean isDisabled() {
Expand Down

0 comments on commit 88c9bc4

Please sign in to comment.