From d76aa3de4f86ff70ee5e4abef62d000accf4982b Mon Sep 17 00:00:00 2001 From: Nick Telford Date: Fri, 24 May 2024 19:31:41 +0100 Subject: [PATCH] KAFKA-15541: Use LongAdder instead of AtomicInteger `LongAdder` performs better than `AtomicInteger` when under contention from many threads. Since it's possible that many Interactive Query threads could create a large number of `KeyValueIterator`s, we don't want contention on a metric to be a performance bottleneck. The trade-off is memory, as `LongAdder` uses more memory to space out independent counters across different cache lines. In practice, I don't expect this to cause too many problems, as we're only constructing 1 per-store. --- .../state/internals/MeteredKeyValueStore.java | 14 +++++++------- .../MeteredMultiVersionedKeyQueryIterator.java | 10 +++++----- .../state/internals/MeteredSessionStore.java | 6 +++--- .../internals/MeteredTimestampedKeyValueStore.java | 4 ++-- .../state/internals/MeteredWindowStore.java | 6 +++--- .../internals/MeteredWindowStoreIterator.java | 10 +++++----- .../internals/MeteredWindowedKeyValueIterator.java | 10 +++++----- .../state/internals/metrics/StateStoreMetrics.java | 2 +- .../state/internals/MeteredKeyValueStoreTest.java | 6 +++--- .../state/internals/MeteredSessionStoreTest.java | 6 +++--- .../MeteredTimestampedKeyValueStoreTest.java | 6 +++--- .../MeteredVersionedKeyValueStoreTest.java | 6 +++--- .../state/internals/MeteredWindowStoreTest.java | 6 +++--- 13 files changed, 46 insertions(+), 46 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 85a7fe9e6bea1..f4c2b3b6a9903 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -52,7 +52,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -95,7 +95,7 @@ public class MeteredKeyValueStore private StreamsMetricsImpl streamsMetrics; private TaskId taskId; - protected AtomicInteger numOpenIterators = new AtomicInteger(0); + protected LongAdder numOpenIterators = new LongAdder(); @SuppressWarnings("rawtypes") private final Map queryHandlers = @@ -168,7 +168,7 @@ private void registerMetrics() { e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> numOpenIterators.get()); + (config, now) -> numOpenIterators.sum()); } protected Serde prepareValueSerdeForStore(final Serde valueSerde, final SerdeGetter getter) { @@ -467,7 +467,7 @@ private MeteredKeyValueIterator(final KeyValueIterator iter, this.iter = iter; this.sensor = sensor; this.startNs = time.nanoseconds(); - numOpenIterators.incrementAndGet(); + numOpenIterators.increment(); } @Override @@ -491,7 +491,7 @@ public void close() { final long duration = time.nanoseconds() - startNs; sensor.record(duration); iteratorDurationSensor.record(duration); - numOpenIterators.decrementAndGet(); + numOpenIterators.decrement(); } } @@ -515,7 +515,7 @@ private MeteredKeyValueTimestampedIterator(final KeyValueIterator this.sensor = sensor; this.valueDeserializer = valueDeserializer; this.startNs = time.nanoseconds(); - numOpenIterators.incrementAndGet(); + numOpenIterators.increment(); } @Override @@ -539,7 +539,7 @@ public void close() { final long duration = time.nanoseconds() - startNs; sensor.record(duration); iteratorDurationSensor.record(duration); - numOpenIterators.decrementAndGet(); + numOpenIterators.decrement(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java index 4663ef5abbfed..92c315c09be99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.streams.state.internals; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; import org.apache.kafka.common.metrics.Sensor; @@ -28,7 +28,7 @@ public class MeteredMultiVersionedKeyQueryIterator implements VersionedRecord private final VersionedRecordIterator iterator; private final Function, VersionedRecord> deserializeValue; - private final AtomicInteger numOpenIterators; + private final LongAdder numOpenIterators; private final Sensor sensor; private final Time time; private final long startNs; @@ -37,14 +37,14 @@ public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator, VersionedRecord> deserializeValue, - final AtomicInteger numOpenIterators) { + final LongAdder numOpenIterators) { this.iterator = iterator; this.deserializeValue = deserializeValue; this.numOpenIterators = numOpenIterators; this.sensor = sensor; this.time = time; this.startNs = time.nanoseconds(); - numOpenIterators.incrementAndGet(); + numOpenIterators.increment(); } @@ -54,7 +54,7 @@ public void close() { iterator.close(); } finally { sensor.record(time.nanoseconds() - startNs); - numOpenIterators.decrementAndGet(); + numOpenIterators.decrement(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java index 0cb9445a92c2f..5208a0cf671c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java @@ -47,7 +47,7 @@ import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -72,7 +72,7 @@ public class MeteredSessionStore private InternalProcessorContext context; private TaskId taskId; - private AtomicInteger numOpenIterators = new AtomicInteger(0); + private LongAdder numOpenIterators = new LongAdder(); @SuppressWarnings("rawtypes") private final Map queryHandlers = @@ -137,7 +137,7 @@ private void registerMetrics() { e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> numOpenIterators.get()); + (config, now) -> numOpenIterators.sum()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java index 3c39998389385..757a1bd868cf0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java @@ -325,7 +325,7 @@ private MeteredTimestampedKeyValueStoreIterator(final KeyValueIterator private InternalProcessorContext context; private TaskId taskId; - private AtomicInteger numOpenIterators = new AtomicInteger(0); + private LongAdder numOpenIterators = new LongAdder(); @SuppressWarnings("rawtypes") private final Map queryHandlers = @@ -156,7 +156,7 @@ private void registerMetrics() { e2eLatencySensor = StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), streamsMetrics); iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics); StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics, - (config, now) -> numOpenIterators.get()); + (config, now) -> numOpenIterators.sum()); } @Deprecated diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java index 1294cfc1f4504..62fb7b1b24867 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; class MeteredWindowStoreIterator implements WindowStoreIterator { @@ -34,7 +34,7 @@ class MeteredWindowStoreIterator implements WindowStoreIterator { private final Function valueFrom; private final long startNs; private final Time time; - private final AtomicInteger numOpenIterators; + private final LongAdder numOpenIterators; MeteredWindowStoreIterator(final WindowStoreIterator iter, final Sensor operationSensor, @@ -42,7 +42,7 @@ class MeteredWindowStoreIterator implements WindowStoreIterator { final StreamsMetrics metrics, final Function valueFrom, final Time time, - final AtomicInteger numOpenIterators) { + final LongAdder numOpenIterators) { this.iter = iter; this.operationSensor = operationSensor; this.iteratorSensor = iteratorSensor; @@ -51,7 +51,7 @@ class MeteredWindowStoreIterator implements WindowStoreIterator { this.startNs = time.nanoseconds(); this.time = time; this.numOpenIterators = numOpenIterators; - numOpenIterators.incrementAndGet(); + numOpenIterators.increment(); } @Override @@ -73,7 +73,7 @@ public void close() { final long duration = time.nanoseconds() - startNs; operationSensor.record(duration); iteratorSensor.record(duration); - numOpenIterators.decrementAndGet(); + numOpenIterators.decrement(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java index e69b27c2c8ed5..e663a2b3a2036 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.KeyValueIterator; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; class MeteredWindowedKeyValueIterator implements KeyValueIterator, V> { @@ -37,7 +37,7 @@ class MeteredWindowedKeyValueIterator implements KeyValueIterator deserializeValue; private final long startNs; private final Time time; - private final AtomicInteger numOpenIterators; + private final LongAdder numOpenIterators; MeteredWindowedKeyValueIterator(final KeyValueIterator, byte[]> iter, final Sensor operationSensor, @@ -46,7 +46,7 @@ class MeteredWindowedKeyValueIterator implements KeyValueIterator deserializeKey, final Function deserializeValue, final Time time, - final AtomicInteger numOpenIterators) { + final LongAdder numOpenIterators) { this.iter = iter; this.operationSensor = operationSensor; this.iteratorSensor = iteratorSensor; @@ -56,7 +56,7 @@ class MeteredWindowedKeyValueIterator implements KeyValueIterator numOpenIteratorsGauge) { + final Gauge numOpenIteratorsGauge) { streamsMetrics.addStoreLevelMutableMetric( taskId, storeType, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java index d2227bc69b103..7e78084027e2c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java @@ -451,13 +451,13 @@ public void shouldTrackOpenIteratorsMetric() { final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); assertThat(openIteratorsMetric, not(nullValue())); - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); try (final KeyValueIterator iterator = metered.prefixScan(KEY, stringSerializer)) { - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L)); } - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java index 3c5a923e6d42b..994a410bc5ef7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java @@ -615,13 +615,13 @@ public void shouldTrackOpenIteratorsMetric() { final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); assertThat(openIteratorsMetric, not(nullValue())); - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); try (final KeyValueIterator, String> iterator = store.backwardFetch(KEY)) { - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L)); } - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java index 7e37440a075bd..9fbd6ea93caf7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java @@ -449,13 +449,13 @@ public void shouldTrackOpenIteratorsMetric() { final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); assertThat(openIteratorsMetric, not(nullValue())); - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); try (final KeyValueIterator> iterator = metered.all()) { - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L)); } - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java index 10b8e2ec6fb1c..09159427c5ac3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java @@ -380,15 +380,15 @@ public void shouldTrackOpenIteratorsMetric() { final KafkaMetric openIteratorsMetric = getMetric("num-open-iterators"); assertThat(openIteratorsMetric, not(nullValue())); - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); final QueryResult> result = store.query(query, bound, config); try (final VersionedRecordIterator iterator = result.getResult()) { - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L)); } - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java index aa637035c7756..c57bd1956f4d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java @@ -456,13 +456,13 @@ public void shouldTrackOpenIteratorsMetric() { final KafkaMetric openIteratorsMetric = metric("num-open-iterators"); assertThat(openIteratorsMetric, not(nullValue())); - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); try (final KeyValueIterator, String> iterator = store.all()) { - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(1)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L)); } - assertThat((Integer) openIteratorsMetric.metricValue(), equalTo(0)); + assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L)); } @Test