-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Issue 17952: Limit the number of pending requests to BookKeeper to save the broker from OODM #17953
Conversation
4e68b80
to
030b8a5
Compare
Codecov Report
@@ Coverage Diff @@
## master #17953 +/- ##
============================================
+ Coverage 34.91% 35.90% +0.98%
+ Complexity 5707 4596 -1111
============================================
Files 607 393 -214
Lines 53396 43424 -9972
Branches 5712 4462 -1250
============================================
- Hits 18644 15591 -3053
+ Misses 32119 25579 -6540
+ Partials 2633 2254 -379
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a great addition @eolivelli!
long now = System.currentTimeMillis(); | ||
if (now - newHandle.creationTime > readEntryTimeoutMillis) { | ||
String message = "Time-out elapsed reading from ledger " | ||
+ lh.getId() | ||
+ ", " + rangeEntryCache.getName() | ||
+ ", estimated read size " + estimatedReadSize + " bytes" | ||
+ " for " + (1 + lastEntry - firstEntry) + " entries"; | ||
log.error(message); | ||
pendingReadsLimiter.release(newHandle); | ||
originalCallback.readEntriesFailed( | ||
new ManagedLedgerException.TooManyRequestsException(message), ctx); | ||
return null; | ||
} | ||
this.rangeEntryCache.ml.getExecutor().submitOrdered(lh.getId(), () -> { | ||
readEntriesInternal(lh, firstEntry, lastEntry, shouldCacheEntry, | ||
originalCallback, ctx, newHandle); | ||
return null; | ||
}); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this reactive by queuing cursors that have requested more entries and then feed those cursors as memory gets freed? We'd need a scheduled task to see if the next request in the queue is timed out, but it'd prevent us from putting these executables into the queue with high frequency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this reactive by queuing cursors that have requested more entries and then feed those cursors as memory gets freed?
I have thought about creating some kind of "async rate limiter", but after all you would have to keep a list (or priority queue) with the reads that are waiting for the needed amount of permits to be available.
Then the problem would be to implement some "fair" algorithm that:
- does not let the pending reads starve
- tried to be "fair" and serve the pending reads in FIFO order.
With the current mechanism we are scheduling the read on the Thread that is pinned to the ManagerLedger, and this naturally adds some kind of back pressure depending on the demands of reads of the ML.
In one broker all the ML are competing on the available memory and I don't want to keep a single global list of pending reads, as it will be really hard to make it "fair".
With this approach basically each ML will have its own list of waiting pending reads (the internal queue of the pinned executor).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify, your concern about fairness is not to have a global FIFO, but is to ensure that the QoS for independent subscriptions is fair, right? A global queue would be fair in that it would be FIFO, but could risk becoming a performance bottleneck. However, we'd only ever queue pending reads when we're over the memory threshold, so the performance is already going to be degraded.
One potential option for ensuring fairness is to put that on the rate limiting solution.
With the current mechanism we are scheduling the read on the Thread that is pinned to the ManagerLedger, and this naturally adds some kind of back pressure depending on the demands of reads of the ML.
One potential issue for this solution is that the managed ledger thread is already responsible for a lot. This scheduled task has no back off, so it will run very frequently and could become expensive if there is contention on the PendingReadsLimiter lock.
With this approach basically each ML will have its own list of waiting pending reads (the internal queue of the pinned executor).
It seems to me the current solution has a race where each ML is competing to get the memory permits. The race does not prevent a single ML from getting all of the permits.
private static final Gauge PULSAR_ML_PENDING_READS_BUFFER_SIZE = Gauge | ||
.build() | ||
.name("pulsar_ml_pending_reads_buffer_size") | ||
.help("Estimated number of bytes for pending reads from storage") | ||
.register(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another interesting metric would be the amount of free space left for pending reads. It could technically be computed based on this metric as long as an operator knows the configured limit, so it might not be important to add. Although, it would be a good signal that the broker is under heavy load, though if the load is from a single topic with many subscriptions, load balancing will not help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
// feature is disabled | ||
return DISABLED; | ||
} | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could create additional contention in the read path. I don't see an immediate solution. I am only mentioning in case someone else has concerns. I assume this is why maxMessagePublishBufferSizeInMB
is divided up to create limits for each thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we don't have control about the thread that is going to run this code.
also here the thread that runs this code will not be the thread that will allocate the memory, because that thread will be the BK internal thread for the Ledger (not the ManagedLedger).
long size = entries.get(0).getLength(); | ||
estimatedEntrySize = size; | ||
} | ||
pendingReadsLimiter.release(newHandle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the design choice to release the acquired byte permit here, but I think it leaves us open to potential OOM issues in the case that the consumers are slow to consume from the TCP connection. In that case, you could have many consumers request entries then not consume them. Also in that case, we know the exact size of the entries, and we are also potentially putting them in the read cache in catch up scenarios. It might be worth exploring another feature to limit these in memory messages to also add back pressure to consumers that are requesting more entries.
Technically, there is an attempted back pressure implementation here
Lines 386 to 392 in db26073
if (!consumer.isWritable()) { | |
// If the connection is not currently writable, we issue the read request anyway, but for a single | |
// message. The intent here is to keep use the request as a notification mechanism while avoiding to | |
// read and dispatch a big batch of messages which will need to wait before getting written to the | |
// socket. | |
availablePermits = 1; | |
} |
However, I think we want something more robust. Even in the case where the outbound consumer connection is writable, we might be filling up memory with many messages.
Additionally, there is a limit on the size of entries cached, but if we have many reads that are in flight in the broker's memory, we could have issues if there isn't an upper bound on the already read but not yet released entries in the broker's memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we reach this point the "read" has been performed and now it is the higher level component (whatever it is, not only Cursors may use ManagedLedgers) that is in charge of the memory allocated by BK.
Here we are counting the size of this virtual "buffer" that contains the bytes read from BK but not yet passed to the rest of the application
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java
Outdated
Show resolved
Hide resolved
if (remainingPendingRequestsBytes == 0) { | ||
return new Handle(0, false, 1, System.currentTimeMillis()); | ||
} | ||
long needed = permits; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Can we remove the needed
variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
… from OODM Modifications: - limit the global number of pending reads to BookKeeper - new configuration entry managedLedgerMaxPendingReadsBufferSizeInMB (0 = feature disabled) - we estimate the entry size per-topic, using the size of the last read entry (new metric pulsar_ml_pending_reads_estimated_entry_size, per topic) - new metric pulsar_ml_pending_reads_buffer_sizeto get the estimated size of in-flight read requests in bytes - if the feature is disabled this changed does not add significant overhead Result: This prevents the broker from going to OutOfMemory (direct memory) in case of many concurrent reads from the bookie
…agedLedgerFactoryConfig.java Co-authored-by: Michael Marshall <[email protected]>
4616050
to
88c9bc4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michaeljmarshall I have addressed your comments, PTAL
// feature is disabled | ||
return DISABLED; | ||
} | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we don't have control about the thread that is going to run this code.
also here the thread that runs this code will not be the thread that will allocate the memory, because that thread will be the BK internal thread for the Ledger (not the ManagedLedger).
if (remainingPendingRequestsBytes == 0) { | ||
return new Handle(0, false, 1, System.currentTimeMillis()); | ||
} | ||
long needed = permits; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
private static final Gauge PULSAR_ML_PENDING_READS_BUFFER_SIZE = Gauge | ||
.build() | ||
.name("pulsar_ml_pending_reads_buffer_size") | ||
.help("Estimated number of bytes for pending reads from storage") | ||
.register(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
long size = entries.get(0).getLength(); | ||
estimatedEntrySize = size; | ||
} | ||
pendingReadsLimiter.release(newHandle); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we reach this point the "read" has been performed and now it is the higher level component (whatever it is, not only Cursors may use ManagedLedgers) that is in charge of the memory allocated by BK.
Here we are counting the size of this virtual "buffer" that contains the bytes read from BK but not yet passed to the rest of the application
I have some enhancements on this patch. |
Fixes #17952
Modifications:
Result:
This prevents the broker from going to OutOfMemory (direct memory) in case of many concurrent reads from the bookie
Motivation
see #17952
There is currently no limit on the amount of memory that is needed to transfer the read results from storage (BK or Tiered Storage).
We need a way to protect the broker from allocating too much memory.
Modifications
managedLedgerMaxPendingReadsBufferSizeInMB
(0 = feature disabled)Verifying this change
This change added tests.
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: eolivelli#18