Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6144: option to query restoring and standby #7962

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter;
import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger;
import org.slf4j.Logger;
Expand Down Expand Up @@ -212,7 +211,7 @@ public enum State {
this.validTransitions.addAll(Arrays.asList(validTransitions));
}

public boolean isRunning() {
public boolean isRunningOrRebalancing() {
return equals(RUNNING) || equals(REBALANCING);
}

Expand Down Expand Up @@ -296,14 +295,14 @@ public State state() {
return state;
}

private boolean isRunning() {
private boolean isRunningOrRebalancing() {
synchronized (stateLock) {
return state.isRunning();
return state.isRunningOrRebalancing();
}
}

private void validateIsRunning() {
if (!isRunning()) {
private void validateIsRunningOrRebalancing() {
if (!isRunningOrRebalancing()) {
throw new IllegalStateException("KafkaStreams is not running. State is " + state + ".");
}
}
Expand Down Expand Up @@ -738,7 +737,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
adminClient = clientSupplier.getAdmin(config.getAdminConfigs(StreamThread.getSharedAdminClientId(clientId)));

final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
final ArrayList<StreamThreadStateStoreProvider> storeProviders = new ArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = StreamThread.create(
internalTopologyBuilder,
Expand Down Expand Up @@ -1003,7 +1002,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument
* @throws StreamsException if cleanup failed
*/
public void cleanUp() {
if (isRunning()) {
if (isRunningOrRebalancing()) {
throw new IllegalStateException("Cannot clean up while running.");
}
stateDirectory.clean();
Expand All @@ -1019,7 +1018,7 @@ public void cleanUp() {
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> allMetadata() {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
}

Expand All @@ -1039,7 +1038,7 @@ public Collection<StreamsMetadata> allMetadata() {
* this application
*/
public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
}

Expand Down Expand Up @@ -1081,7 +1080,7 @@ public Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer);
}

Expand Down Expand Up @@ -1114,7 +1113,7 @@ public <K> StreamsMetadata metadataForKey(final String storeName,
public <K> StreamsMetadata metadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner);
}

Expand All @@ -1130,7 +1129,7 @@ public <K> StreamsMetadata metadataForKey(final String storeName,
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer);
}

Expand All @@ -1147,7 +1146,7 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunning();
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner);
}

Expand All @@ -1156,6 +1155,10 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
* type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
* The returned object can be used to query the {@link StateStore} instances.
*
* Only permits queries on active replicas of the store (no standbys or restoring replicas).
* See {@link KafkaStreams#store(java.lang.String, org.apache.kafka.streams.state.QueryableStoreType, boolean)}
* for the option to set {@code includeStaleStores} to true and trade off consistency in favor of availability.
*
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param <T> return type
Expand All @@ -1164,8 +1167,30 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
* {@code queryableStoreType} doesn't exist
*/
public <T> T store(final String storeName, final QueryableStoreType<T> queryableStoreType) {
validateIsRunning();
return queryableStoreProvider.getStore(storeName, queryableStoreType);
return store(storeName, queryableStoreType, false);
}

/**
* Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's
* type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}.
* The returned object can be used to query the {@link StateStore} instances.
*
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param includeStaleStores If false, only permit queries on the active replica for a partition, and only if the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic: extra space at the start

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed that, too. Oh well.

* task for that partition is running. I.e., the state store is not a standby replica,
* and it is not restoring from the changelog.
* If true, allow queries on standbys and restoring replicas in addition to active ones.
* @param <T> return type
* @return A facade wrapping the local {@link StateStore} instances
* @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and
* {@code queryableStoreType} doesn't exist
*/
public <T> T store(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final boolean includeStaleStores) {
validateIsRunningOrRebalancing();
return queryableStoreProvider.getStore(storeName, queryableStoreType, includeStaleStores);
}

/**
Expand All @@ -1174,7 +1199,7 @@ public <T> T store(final String storeName, final QueryableStoreType<T> queryable
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> localThreadsMetadata() {
validateIsRunning();
validateIsRunningOrRebalancing();
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
for (final StreamThread thread : threads) {
threadMetadata.add(thread.threadMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -147,7 +148,7 @@ public enum State implements ThreadStateTransitionValidator {
this.validTransitions.addAll(Arrays.asList(validTransitions));
}

public boolean isRunning() {
public boolean isAlive() {
return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
}

Expand Down Expand Up @@ -235,14 +236,9 @@ State setState(final State newState) {
return oldState;
}

public boolean isRunningAndNotRebalancing() {
// we do not need to grab stateLock since it is a single read
return state == State.RUNNING;
}

public boolean isRunning() {
synchronized (stateLock) {
return state.isRunning();
return state.isAlive();
}
}

Expand Down Expand Up @@ -1191,10 +1187,17 @@ private void updateThreadMetadata(final Map<TaskId, StreamTask> activeTasks,
standbyTasksMetadata);
}

public Map<TaskId, StreamTask> tasks() {
public Map<TaskId, StreamTask> activeTasks() {
return taskManager.activeTasks();
}

public Map<TaskId, Task> allTasks() {
final Map<TaskId, Task> result = new TreeMap<>();
result.putAll(taskManager.standbyTasks());
result.putAll(taskManager.activeTasks());
return result;
}

/**
* Produces a string representation containing useful information about a StreamThread.
* This is useful in debugging scenarios.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@
import java.util.ArrayList;
import java.util.List;

import static java.util.Collections.singletonList;

/**
* A wrapper over all of the {@link StateStoreProvider}s in a Topology
*/
public class QueryableStoreProvider {

private final List<StateStoreProvider> storeProviders;
private final List<StreamThreadStateStoreProvider> storeProviders;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking aloud: even though we are binding to a specific StateStoreProvider implementation here, it seems fine,since there are n't any other really in a topology

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the point of this change, actually, StreamThreadStateStoreProvider is no longer a StateStoreProvider. If you need a StateStoreProvider, you have to adapt it with WrappingStoreProvider.

private final GlobalStateStoreProvider globalStoreProvider;

public QueryableStoreProvider(final List<StateStoreProvider> storeProviders,
public QueryableStoreProvider(final List<StreamThreadStateStoreProvider> storeProviders,
final GlobalStateStoreProvider globalStateStoreProvider) {
this.storeProviders = new ArrayList<>(storeProviders);
this.globalStoreProvider = globalStateStoreProvider;
Expand All @@ -45,24 +43,28 @@ public QueryableStoreProvider(final List<StateStoreProvider> storeProviders,
*
* @param storeName name of the store
* @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)}
* @param includeStaleStores if true, include standbys and recovering stores;
* if false, only include running actives.
* @param <T> The expected type of the returned store
* @return A composite object that wraps the store instances.
*/
public <T> T getStore(final String storeName,
final QueryableStoreType<T> queryableStoreType) {
final QueryableStoreType<T> queryableStoreType,
final boolean includeStaleStores) {
final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(new WrappingStoreProvider(singletonList(globalStoreProvider)), storeName);
return queryableStoreType.create(globalStoreProvider, storeName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking aloud: guess there is nt much value in wrapping a single provider.. so +1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the role of the wrapping store provider really is nothing more than just iterating over the list of providers. For global stores, there's always exactly one provider, so it's purely a performance penalty for no gain at all.

}
final List<T> allStores = new ArrayList<>();
for (final StateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType));
for (final StreamThreadStateStoreProvider storeProvider : storeProviders) {
allStores.addAll(storeProvider.stores(storeName, queryableStoreType, includeStaleStores));
}
if (allStores.isEmpty()) {
throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance.");
}
return queryableStoreType.create(
new WrappingStoreProvider(storeProviders),
storeName);
new WrappingStoreProvider(storeProviders, includeStaleStores),
storeName
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreType;
Expand All @@ -28,11 +29,9 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Wrapper over StreamThread that implements StateStoreProvider
*/
public class StreamThreadStateStoreProvider implements StateStoreProvider {
public class StreamThreadStateStoreProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, just Global and Wrapping StateStoreProviders exist? IIUC, most of the xxxStore classes are just accessing the Wrapping.. store provider? Makes me wonder, if we should just use the QueryableStoreProvider everywhere and cull the interface..

Anyway, I am not familiar enough with this part of the code. So I leave it to you..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel that we can collapse the layered interfaces a bit further after we've changed this, since the original motivation of having it is just to "stitch" the global stores and local stores together when exposing as KafkaStreams#stores.

We can consider that in a separate, cleanup PR afterwards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, StateStoreProvider is used in the public API as part of QueryableStoreType. Its role is well defined already. That's the one that we should continue to use "everywhere".

QueryableStoreProvider is a utility for selecting either the global store to return OR packaging the thread store providers into a WrappingStoreProvider and using it to actually create a usable store. It's not a StateStoreProvider at all.

WrappingStoreProvider on the other hand is a StateStoreProvider, and adapts a List<StreamThreadStoreProvider> to the StateStoreProvider interface.

GlobalStoreProvider is also a StateStoreProvider.

So, if anything, we could increase clarity by renaming some of these classes to more accurately reflect their roles, but all the involved classes have well defined and necessary roles right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang I think we wrote our comments at the same time. Just to respond directly, I do think that we can clean up the code base in a follow-on change (by renaming stuff that's named misleadingly now), but I don't think we can collapse the hierarchy at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! SGTM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can work on the follow-on change.

WrappingStoreProvider -> WrappingStateStoreProvider
GlobalStoreProvider -> GlobalStateStoreProvider

QueryableStoreProvider is left as is.


private final StreamThread streamThread;

Expand All @@ -41,33 +40,40 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) {
}

@SuppressWarnings("unchecked")
@Override
public <T> List<T> stores(final String storeName, final QueryableStoreType<T> queryableStoreType) {
public <T> List<T> stores(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final boolean includeStaleStores) {
if (streamThread.state() == StreamThread.State.DEAD) {
return Collections.emptyList();
}
if (!streamThread.isRunningAndNotRebalancing()) {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
streamThread.state() + ", not RUNNING");
}
final List<T> stores = new ArrayList<>();
for (final Task streamTask : streamThread.tasks().values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask +
" because the store is not open. The state store may have migrated to another instances.");
}
if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
stores.add((T) new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore<Object, Object>) store));
} else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
stores.add((T) new ReadOnlyWindowStoreFacade((TimestampedWindowStore<Object, Object>) store));
} else {
stores.add((T) store);
final StreamThread.State state = streamThread.state();
if (includeStaleStores ? state.isAlive() : state == StreamThread.State.RUNNING) {
final Map<TaskId, ? extends Task> tasks = includeStaleStores ? streamThread.allTasks() : streamThread.activeTasks();
final List<T> stores = new ArrayList<>();
for (final Task streamTask : tasks.values()) {
final StateStore store = streamTask.getStore(storeName);
if (store != null && queryableStoreType.accepts(store)) {
if (!store.isOpen()) {
throw new InvalidStateStoreException(
"Cannot get state store " + storeName + " for task " + streamTask +
" because the store is not open. " +
"The state store may have migrated to another instances.");
}
if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
stores.add((T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store));
} else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
stores.add((T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
} else {
stores.add((T) store);
}
}
}
return stores;
} else {
throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " +
state + ", not RUNNING" +
(includeStaleStores ? " or REBALANCING" : ""));
}
return stores;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.QueryableStoreType;

import java.util.ArrayList;
Expand All @@ -28,25 +27,21 @@
*/
public class WrappingStoreProvider implements StateStoreProvider {

private final List<StateStoreProvider> storeProviders;
private final List<StreamThreadStateStoreProvider> storeProviders;
private final boolean includeStaleStores;

WrappingStoreProvider(final List<StateStoreProvider> storeProviders) {
WrappingStoreProvider(final List<StreamThreadStateStoreProvider> storeProviders,
final boolean includeStaleStores) {
this.storeProviders = storeProviders;
this.includeStaleStores = includeStaleStores;
}

/**
* Provides access to {@link org.apache.kafka.streams.processor.StateStore}s accepted
* by {@link QueryableStoreType#accepts(StateStore)}
* @param storeName name of the store
* @param type The {@link QueryableStoreType}
* @param <T> The type of the Store, for example, {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore}
* @return a List of all the stores with the storeName and are accepted by {@link QueryableStoreType#accepts(StateStore)}
*/
@Override
public <T> List<T> stores(final String storeName,
final QueryableStoreType<T> type) {
final QueryableStoreType<T> queryableStoreType) {
final List<T> allStores = new ArrayList<>();
for (final StateStoreProvider provider : storeProviders) {
final List<T> stores = provider.stores(storeName, type);
for (final StreamThreadStateStoreProvider provider : storeProviders) {
final List<T> stores = provider.stores(storeName, queryableStoreType, includeStaleStores);
allStores.addAll(stores);
}
if (allStores.isEmpty()) {
Expand Down
Loading