Skip to content

Commit

Permalink
KAFKA-15541: Add oldest-iterator-open-since-ms metric (apache#16041)
Browse files Browse the repository at this point in the history
Part of [KIP-989](https://cwiki.apache.org/confluence/x/9KCzDw).

This new `StateStore` metric tracks the timestamp that the oldest
surviving Iterator was created.

This timestamp should continue to climb, and closely track the current
time, as old iterators are closed and new ones created. If the timestamp
remains very low (i.e. old), that suggests an Iterator has leaked, which
should enable users to isolate the affected store.

It will report no data when there are no currently open Iterators.

Reviewers: Matthias J. Sax <[email protected]>
  • Loading branch information
nicktelford authored May 28, 2024
1 parent 4eb60b5 commit 59ba555
Show file tree
Hide file tree
Showing 15 changed files with 373 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;

/**
* Common super-interface of all Metered Iterator types.
*
* This enables tracking the timestamp the Iterator was first created, for the oldest-iterator-open-since-ms metric.
*/
public interface MeteredIterator {

/**
* @return The UNIX timestamp, in milliseconds, that this Iterator was created/opened.
*/
long startTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;

Expand Down Expand Up @@ -96,6 +99,7 @@ public class MeteredKeyValueStore<K, V>
private TaskId taskId;

protected LongAdder numOpenIterators = new LongAdder();
protected NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));

@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
Expand Down Expand Up @@ -169,6 +173,9 @@ private void registerMetrics() {
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
);
}

protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, final SerdeGetter getter) {
Expand Down Expand Up @@ -456,18 +463,26 @@ protected void maybeRecordE2ELatency() {
}
}

private class MeteredKeyValueIterator implements KeyValueIterator<K, V> {
private class MeteredKeyValueIterator implements KeyValueIterator<K, V>, MeteredIterator {

private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestamp;

private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]> iter,
final Sensor sensor) {
this.iter = iter;
this.sensor = sensor;
this.startTimestamp = time.milliseconds();
this.startNs = time.nanoseconds();
numOpenIterators.increment();
openIterators.add(this);
}

@Override
public long startTimestamp() {
return startTimestamp;
}

@Override
Expand All @@ -492,6 +507,7 @@ public void close() {
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

Expand All @@ -501,11 +517,12 @@ public K peekNextKey() {
}
}

private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K, V> {
private class MeteredKeyValueTimestampedIterator implements KeyValueIterator<K, V>, MeteredIterator {

private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestamp;
private final Function<byte[], V> valueDeserializer;

private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]> iter,
Expand All @@ -514,8 +531,15 @@ private MeteredKeyValueTimestampedIterator(final KeyValueIterator<Bytes, byte[]>
this.iter = iter;
this.sensor = sensor;
this.valueDeserializer = valueDeserializer;
this.startTimestamp = time.milliseconds();
this.startNs = time.nanoseconds();
numOpenIterators.increment();
openIterators.add(this);
}

@Override
public long startTimestamp() {
return startTimestamp;
}

@Override
Expand All @@ -540,6 +564,7 @@ public void close() {
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,47 @@
package org.apache.kafka.streams.state.internals;

import java.util.concurrent.atomic.LongAdder;
import java.util.Set;
import java.util.function.Function;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.VersionedRecord;

public class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V> {
class MeteredMultiVersionedKeyQueryIterator<V> implements VersionedRecordIterator<V>, MeteredIterator {

private final VersionedRecordIterator<byte[]> iterator;
private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue;
private final LongAdder numOpenIterators;
private final Sensor sensor;
private final Time time;
private final long startNs;
private final long startTimestampMs;
private final Set<MeteredIterator> openIterators;

public MeteredMultiVersionedKeyQueryIterator(final VersionedRecordIterator<byte[]> iterator,
final Sensor sensor,
final Time time,
final Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
final LongAdder numOpenIterators) {
final LongAdder numOpenIterators,
final Set<MeteredIterator> openIterators) {
this.iterator = iterator;
this.deserializeValue = deserializeValue;
this.numOpenIterators = numOpenIterators;
this.openIterators = openIterators;
this.sensor = sensor;
this.time = time;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
numOpenIterators.increment();
openIterators.add(this);
}

@Override
public long startTimestamp() {
return startTimestampMs;
}

@Override
public void close() {
Expand All @@ -55,6 +66,7 @@ public void close() {
} finally {
sensor.record(time.nanoseconds() - startNs);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

import java.util.Comparator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.LongAdder;

import static org.apache.kafka.common.utils.Utils.mkEntry;
Expand All @@ -73,6 +76,7 @@ public class MeteredSessionStore<K, V>
private TaskId taskId;

private LongAdder numOpenIterators = new LongAdder();
private final NavigableSet<MeteredIterator> openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));

@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
Expand Down Expand Up @@ -138,6 +142,9 @@ private void registerMetrics() {
iteratorDurationSensor = StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, name(), streamsMetrics,
(config, now) -> openIterators.isEmpty() ? null : openIterators.first().startTimestamp()
);
}


Expand Down Expand Up @@ -257,7 +264,8 @@ public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}

@Override
Expand All @@ -271,7 +279,8 @@ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}

Expand All @@ -286,7 +295,8 @@ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}

@Override
Expand All @@ -300,7 +310,8 @@ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}

Expand All @@ -321,7 +332,8 @@ public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}

@Override
Expand All @@ -342,7 +354,8 @@ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}

Expand All @@ -365,7 +378,8 @@ public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}

@Override
Expand All @@ -379,7 +393,8 @@ public KeyValueIterator<Windowed<K>, V> findSessions(final long earliestSessionE
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators);
numOpenIterators,
openIterators);
}

@Override
Expand All @@ -402,7 +417,8 @@ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K keyFrom,
serdes::keyFrom,
serdes::valueFrom,
time,
numOpenIterators
numOpenIterators,
openIterators
);
}

Expand Down Expand Up @@ -474,7 +490,8 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
serdes::keyFrom,
StoreQueryUtils.getDeserializeValue(serdes, wrapped()),
time,
numOpenIterators
numOpenIterators,
openIterators
);
final QueryResult<MeteredWindowedKeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,12 @@ private <R> QueryResult<R> runRangeQuery(final Query<R> query,
}

@SuppressWarnings("unchecked")
private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V> {
private class MeteredTimestampedKeyValueStoreIterator implements KeyValueIterator<K, V>, MeteredIterator {

private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestampMs;
private final Function<byte[], ValueAndTimestamp<V>> valueAndTimestampDeserializer;

private final boolean returnPlainValue;
Expand All @@ -324,8 +325,15 @@ private MeteredTimestampedKeyValueStoreIterator(final KeyValueIterator<Bytes, by
this.sensor = sensor;
this.valueAndTimestampDeserializer = valueAndTimestampDeserializer;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
this.returnPlainValue = returnPlainValue;
numOpenIterators.increment();
openIterators.add(this);
}

@Override
public long startTimestamp() {
return startTimestampMs;
}

@Override
Expand Down Expand Up @@ -354,6 +362,7 @@ public void close() {
sensor.record(duration);
iteratorDurationSensor.record(duration);
numOpenIterators.decrement();
openIterators.remove(this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R> query, final
iteratorDurationSensor,
time,
StoreQueryUtils.getDeserializeValue(plainValueSerdes),
numOpenIterators
numOpenIterators,
openIterators
);
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, typedResult);
Expand Down
Loading

0 comments on commit 59ba555

Please sign in to comment.