-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added in-flight cancellation of SearchShardTask based on resource consumption #4575
Added in-flight cancellation of SearchShardTask based on resource consumption #4575
Conversation
Gradle Check (Jenkins) Run Completed with:
|
Codecov Report
@@ Coverage Diff @@
## main #4575 +/- ##
============================================
+ Coverage 70.60% 70.75% +0.15%
- Complexity 57603 57794 +191
============================================
Files 4675 4687 +12
Lines 276925 277290 +365
Branches 40347 40376 +29
============================================
+ Hits 195517 196193 +676
+ Misses 65156 64779 -377
- Partials 16252 16318 +66
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ketanv3 I have shared few thoughts on how we could break down the core logic into separate classes for better maintainability.
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureSettings.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/TaskCancellation.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureManager.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureManager.java
Outdated
Show resolved
Hide resolved
public synchronized double record(long value) { | ||
long delta = value - observations[(int) (count % observations.length)]; | ||
observations[(int) (count % observations.length)] = value; | ||
|
||
count++; | ||
sum += delta; | ||
average = (double) sum / Math.min(count, observations.length); | ||
return average; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a queue and see if we can avoid the synchronized block by using CAS if possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually plan to benchmark CAS vs synchronized approaches before committing to either one of them, especially since the operations are pretty simple and quick to execute.
If there are major gains with CAS then it makes sense to go with it. Otherwise, I would prefer keeping it simple and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not observe any major gains with queue + CAS approach as followed in indexing back-pressure (here). With higher thread contention, it performed slightly poorly.
# JMH version: 1.35
# VM version: JDK 17.0.3, OpenJDK 64-Bit Server VM, 17.0.3+7
Benchmark (1 thread - no contention) Mode Cnt Score Error Units
MovingAverageBenchmark.timeMovingAverage avgt 5 25.669 ± 1.884 ns/op
MovingAverageBenchmark.timeMovingAverageQueue avgt 5 25.213 ± 0.383 ns/op
Benchmark (4 threads - low contention) Mode Cnt Score Error Units
MovingAverageBenchmark.timeMovingAverage avgt 5 217.714 ± 6.676 ns/op
MovingAverageBenchmark.timeMovingAverageQueue avgt 5 223.088 ± 3.651 ns/op
Benchmark (16 threads - high contention) Mode Cnt Score Error Units
MovingAverageBenchmark.timeMovingAverage avgt 5 785.830 ± 13.446 ns/op
MovingAverageBenchmark.timeMovingAverageQueue avgt 5 792.442 ± 64.234 ns/op
It is also worth noting that the current implementation of moving average in shard indexing back-pressure has subtle race-condition bugs. Two or more concurrent threads could reach this point and remove excessive elements from the queue leading to incorrect results, or even NPE in the worst case. Another problem is lack of causal ordering – an older thread may overwrite a new thread's average at this point.
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureManager.java
Outdated
Show resolved
Hide resolved
4456edd
to
caad78d
Compare
Gradle Check (Jenkins) Run Completed with:
|
caad78d
to
ef01ba3
Compare
Gradle Check (Jenkins) Run Completed with:
|
ef01ba3
to
9d0f2f0
Compare
Gradle Check (Jenkins) Run Completed with:
|
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/TaskCancellation.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/tasks/TaskCancellation.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
Show resolved
Hide resolved
9d0f2f0
to
1f33317
Compare
Gradle Check (Jenkins) Run Completed with:
|
1f33317
to
6445b4a
Compare
Gradle Check (Jenkins) Run Completed with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
Outdated
Show resolved
Hide resolved
*/ | ||
public synchronized double record(long value) { | ||
long delta = value - observations[(int) (count % observations.length)]; | ||
observations[(int) (count % observations.length)] = value; | ||
|
||
count++; | ||
sum += delta; | ||
average = (double) sum / Math.min(count, observations.length); | ||
return average; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use CAS here with a do-while loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using CAS backed by a queue or a ring-buffer, it may be possible to track the moving average similar to how it's done in indexing backpressure (though the current implementation has subtle race-condition bugs which I have highlighted in an above comment).
To successfully implement this, we need to ensure the last 'n' items, running sum, and count of inserted items are updated atomically; which may not be possible with CAS alone. We need to treat the entire state (even the backing queue/buffer) as immutable and create a new copy with every update (similar concept to CopyOnWriteArrayList
).
Our use-case is write heavy (on task completion) with infrequent reads (on search backpressure service iteration), creating copies may be very expensive especially for larger window sizes. I'm still inclined to use the current approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Benchmark results comparing the existing approach v/s CAS backed by an immutable ring-buffer.
Using synchronized
(current approach):
10 observations | 100 observations | 1000 observations | |
---|---|---|---|
1 thread | 14.123 ns/op | 14.258 ns/op | 14.544 ns/op |
4 threads | 368.087 ns/op | 364.879 ns/op | 378.378 ns/op |
16 threads | 1400.703 ns/op | 1506.456 ns/op | 1809.835 ns/op |
Using compare-and-set backed by an immutable ring-buffer (implementation reference):
10 observations | 100 observations | 1000 observations | |
---|---|---|---|
1 thread | 27.438 ns/op | 112.920 ns/op | 1082.057 ns/op |
4 threads | 798.966 ns/op | 1077.040 ns/op | 4675.375 ns/op |
16 threads | 5820.276 ns/op | 8374.579 ns/op | 36474.605 ns/op |
Key observations:
- Synchronized approach doesn't have to clone the backing buffer, so, the time complexity doesn't grow proportional to the window size. This is a major drawback with the CAS approach as repeated array copy is expensive and creates a lot of GC overhead too.
- Even with smaller window sizes, the performance of CAS becomes poorer as the number of threads (and the contention) grows.
- Synchronized approach performed between 2–20x better for these measurements.
synchronized (this) { | ||
refill(); | ||
|
||
if (tokens >= n) { | ||
tokens -= n; | ||
return true; | ||
} | ||
|
||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use CAS here as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be doable. Will make the changes.
import java.util.concurrent.atomic.AtomicLong; | ||
import java.util.function.LongSupplier; | ||
|
||
public class TokenBucketTests extends OpenSearchTestCase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need multi-threaded tests for guaranteeing thread safety on all new utilities we are building
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/trackers/NodeResourceUsageTracker.java
Outdated
Show resolved
Hide resolved
public long incrementCancellations() { | ||
return cancellations.incrementAndGet(); | ||
} | ||
|
||
public long getCancellations() { | ||
return cancellations.get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we delegate this to TaskCancellation instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are tracking the total cancellations done by a task resource usage tracker.
On the other hand, TaskCancellation
is a wrapper for a single cancellable task along with the list of cancellation reasons and callbacks. The callbacks in it are responsible for calling incrementCancellations()
on the relevant trackers.
Snippet for reference:
TaskCancellation getTaskCancellation(CancellableTask task) {
List<TaskCancellation.Reason> reasons = new ArrayList<>();
List<Runnable> callbacks = new ArrayList<>();
for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
if (reason.isPresent()) {
reasons.add(reason.get());
callbacks.add(tracker::incrementCancellations);
}
}
return new TaskCancellation(task, reasons, callbacks);
}
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureState.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java
Outdated
Show resolved
Hide resolved
64b3123
to
8d46e74
Compare
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
8d46e74
to
69a2ff8
Compare
Gradle Check (Jenkins) Run Completed with:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ketanv3 please add links to follow ups for
- Multi-threaded tests
- Coordinator cancellation at parent
Gradle Check (Jenkins) Run Completed with:
|
…sumption This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. Signed-off-by: Ketan Verma <[email protected]>
2421979
to
3c19186
Compare
Gradle Check (Jenkins) Run Completed with:
|
Gradle Check (Jenkins) Run Completed with:
|
3c19186
to
bfbebef
Compare
Gradle Check (Jenkins) Run Completed with:
|
bfbebef
to
6282515
Compare
Gradle Check (Jenkins) Run Completed with:
|
Signed-off-by: Ketan Verma <[email protected]>
6282515
to
6424470
Compare
Gradle Check (Jenkins) Run Completed with:
|
…on resource consumption (opensearch-project#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. Signed-off-by: Ketan Verma <[email protected]>
…on resource consumption (opensearch-project#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. Signed-off-by: Ketan Verma <[email protected]>
…ource consumption (#5039) * [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. * [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap * [Backport 2.x]Added search backpressure stats API Added search backpressure stats to the existing node/stats API to describe: 1. the number of cancellations (currently for SearchShardTask only) 2. the current state of TaskResourceUsageTracker Signed-off-by: Ketan Verma <[email protected]>
…ource consumption (opensearch-project#5039) * [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (opensearch-project#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. * [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (opensearch-project#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap * [Backport 2.x]Added search backpressure stats API Added search backpressure stats to the existing node/stats API to describe: 1. the number of cancellations (currently for SearchShardTask only) 2. the current state of TaskResourceUsageTracker Signed-off-by: Ketan Verma <[email protected]>
…ource consumption (#5039) * [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. * [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap * [Backport 2.x]Added search backpressure stats API Added search backpressure stats to the existing node/stats API to describe: 1. the number of cancellations (currently for SearchShardTask only) 2. the current state of TaskResourceUsageTracker Signed-off-by: Ketan Verma <[email protected]> (cherry picked from commit 7c521b9)
…ource consumption (#5039) (#5058) * [Backport 2.x] Added in-flight cancellation of SearchShardTask based on resource consumption (#4575) This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. * [Backport 2.x] Added resource usage trackers for in-flight cancellation of SearchShardTask (#4805) 1. CpuUsageTracker: cancels tasks if they consume too much CPU 2. ElapsedTimeTracker: cancels tasks if they consume too much time 3. HeapUsageTracker: cancels tasks if they consume too much heap * [Backport 2.x]Added search backpressure stats API Added search backpressure stats to the existing node/stats API to describe: 1. the number of cancellations (currently for SearchShardTask only) 2. the current state of TaskResourceUsageTracker Signed-off-by: Ketan Verma <[email protected]> (cherry picked from commit 7c521b9) Co-authored-by: Ketan Verma <[email protected]>
…sumption (opensearch-project#4575) * Added in-flight cancellation of SearchShardTask based on resource consumption This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance. Signed-off-by: Ketan Verma <[email protected]>
Description
This feature aims to identify and cancel resource intensive SearchShardTasks if they have breached certain thresholds. This will help in terminating problematic queries which can put nodes in duress and degrade the cluster performance.
This PR is intended to add the core framework for in-flight cancellation of search requests. The actual implementations of ResourceUsageTrackers (which decide if a task should be cancelled), along with the node stats API will be added in a later PR.
Issues Resolved
#1181
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Signed-off-by: Ketan Verma [email protected]