diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index d9f036bb61397..37dfcf0efa98b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -61,7 +61,6 @@ import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider; import org.apache.kafka.streams.state.internals.QueryableStoreProvider; import org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter; -import org.apache.kafka.streams.state.internals.StateStoreProvider; import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger; import org.slf4j.Logger; @@ -212,7 +211,7 @@ public enum State { this.validTransitions.addAll(Arrays.asList(validTransitions)); } - public boolean isRunning() { + public boolean isRunningOrRebalancing() { return equals(RUNNING) || equals(REBALANCING); } @@ -296,14 +295,14 @@ public State state() { return state; } - private boolean isRunning() { + private boolean isRunningOrRebalancing() { synchronized (stateLock) { - return state.isRunning(); + return state.isRunningOrRebalancing(); } } - private void validateIsRunning() { - if (!isRunning()) { + private void validateIsRunningOrRebalancing() { + if (!isRunningOrRebalancing()) { throw new IllegalStateException("KafkaStreams is not running. State is " + state + "."); } } @@ -738,7 +737,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, adminClient = clientSupplier.getAdmin(config.getAdminConfigs(StreamThread.getSharedAdminClientId(clientId))); final Map threadState = new HashMap<>(threads.length); - final ArrayList storeProviders = new ArrayList<>(); + final ArrayList storeProviders = new ArrayList<>(); for (int i = 0; i < threads.length; i++) { threads[i] = StreamThread.create( internalTopologyBuilder, @@ -1003,7 +1002,7 @@ public synchronized boolean close(final Duration timeout) throws IllegalArgument * @throws StreamsException if cleanup failed */ public void cleanUp() { - if (isRunning()) { + if (isRunningOrRebalancing()) { throw new IllegalStateException("Cannot clean up while running."); } stateDirectory.clean(); @@ -1019,7 +1018,7 @@ public void cleanUp() { * @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application */ public Collection allMetadata() { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getAllMetadata(); } @@ -1039,7 +1038,7 @@ public Collection allMetadata() { * this application */ public Collection allMetadataForStore(final String storeName) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getAllMetadataForStore(storeName); } @@ -1081,7 +1080,7 @@ public Collection allMetadataForStore(final String storeName) { public StreamsMetadata metadataForKey(final String storeName, final K key, final Serializer keySerializer) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getMetadataWithKey(storeName, key, keySerializer); } @@ -1114,7 +1113,7 @@ public StreamsMetadata metadataForKey(final String storeName, public StreamsMetadata metadataForKey(final String storeName, final K key, final StreamPartitioner partitioner) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getMetadataWithKey(storeName, key, partitioner); } @@ -1130,7 +1129,7 @@ public StreamsMetadata metadataForKey(final String storeName, public KeyQueryMetadata queryMetadataForKey(final String storeName, final K key, final Serializer keySerializer) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer); } @@ -1147,7 +1146,7 @@ public KeyQueryMetadata queryMetadataForKey(final String storeName, public KeyQueryMetadata queryMetadataForKey(final String storeName, final K key, final StreamPartitioner partitioner) { - validateIsRunning(); + validateIsRunningOrRebalancing(); return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner); } @@ -1156,6 +1155,10 @@ public KeyQueryMetadata queryMetadataForKey(final String storeName, * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. * The returned object can be used to query the {@link StateStore} instances. * + * Only permits queries on active replicas of the store (no standbys or restoring replicas). + * See {@link KafkaStreams#store(java.lang.String, org.apache.kafka.streams.state.QueryableStoreType, boolean)} + * for the option to set {@code includeStaleStores} to true and trade off consistency in favor of availability. + * * @param storeName name of the store to find * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} * @param return type @@ -1164,8 +1167,30 @@ public KeyQueryMetadata queryMetadataForKey(final String storeName, * {@code queryableStoreType} doesn't exist */ public T store(final String storeName, final QueryableStoreType queryableStoreType) { - validateIsRunning(); - return queryableStoreProvider.getStore(storeName, queryableStoreType); + return store(storeName, queryableStoreType, false); + } + + /** + * Get a facade wrapping the local {@link StateStore} instances with the provided {@code storeName} if the Store's + * type is accepted by the provided {@link QueryableStoreType#accepts(StateStore) queryableStoreType}. + * The returned object can be used to query the {@link StateStore} instances. + * + * @param storeName name of the store to find + * @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)} + * @param includeStaleStores If false, only permit queries on the active replica for a partition, and only if the + * task for that partition is running. I.e., the state store is not a standby replica, + * and it is not restoring from the changelog. + * If true, allow queries on standbys and restoring replicas in addition to active ones. + * @param return type + * @return A facade wrapping the local {@link StateStore} instances + * @throws InvalidStateStoreException if Kafka Streams is (re-)initializing or a store with {@code storeName} and + * {@code queryableStoreType} doesn't exist + */ + public T store(final String storeName, + final QueryableStoreType queryableStoreType, + final boolean includeStaleStores) { + validateIsRunningOrRebalancing(); + return queryableStoreProvider.getStore(storeName, queryableStoreType, includeStaleStores); } /** @@ -1174,7 +1199,7 @@ public T store(final String storeName, final QueryableStoreType queryable * @return the set of {@link ThreadMetadata}. */ public Set localThreadsMetadata() { - validateIsRunning(); + validateIsRunningOrRebalancing(); final Set threadMetadata = new HashSet<>(); for (final StreamThread thread : threads) { threadMetadata.add(thread.threadMetadata()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 5abdef1f451ed..60fa1fd9f2e93 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -147,7 +148,7 @@ public enum State implements ThreadStateTransitionValidator { this.validTransitions.addAll(Arrays.asList(validTransitions)); } - public boolean isRunning() { + public boolean isAlive() { return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED); } @@ -235,14 +236,9 @@ State setState(final State newState) { return oldState; } - public boolean isRunningAndNotRebalancing() { - // we do not need to grab stateLock since it is a single read - return state == State.RUNNING; - } - public boolean isRunning() { synchronized (stateLock) { - return state.isRunning(); + return state.isAlive(); } } @@ -1191,10 +1187,17 @@ private void updateThreadMetadata(final Map activeTasks, standbyTasksMetadata); } - public Map tasks() { + public Map activeTasks() { return taskManager.activeTasks(); } + public Map allTasks() { + final Map result = new TreeMap<>(); + result.putAll(taskManager.standbyTasks()); + result.putAll(taskManager.activeTasks()); + return result; + } + /** * Produces a string representation containing useful information about a StreamThread. * This is useful in debugging scenarios. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 2bfecd287e18f..b25207edba813 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -23,17 +23,15 @@ import java.util.ArrayList; import java.util.List; -import static java.util.Collections.singletonList; - /** * A wrapper over all of the {@link StateStoreProvider}s in a Topology */ public class QueryableStoreProvider { - private final List storeProviders; + private final List storeProviders; private final GlobalStateStoreProvider globalStoreProvider; - public QueryableStoreProvider(final List storeProviders, + public QueryableStoreProvider(final List storeProviders, final GlobalStateStoreProvider globalStateStoreProvider) { this.storeProviders = new ArrayList<>(storeProviders); this.globalStoreProvider = globalStateStoreProvider; @@ -45,24 +43,28 @@ public QueryableStoreProvider(final List storeProviders, * * @param storeName name of the store * @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)} + * @param includeStaleStores if true, include standbys and recovering stores; + * if false, only include running actives. * @param The expected type of the returned store * @return A composite object that wraps the store instances. */ public T getStore(final String storeName, - final QueryableStoreType queryableStoreType) { + final QueryableStoreType queryableStoreType, + final boolean includeStaleStores) { final List globalStore = globalStoreProvider.stores(storeName, queryableStoreType); if (!globalStore.isEmpty()) { - return queryableStoreType.create(new WrappingStoreProvider(singletonList(globalStoreProvider)), storeName); + return queryableStoreType.create(globalStoreProvider, storeName); } final List allStores = new ArrayList<>(); - for (final StateStoreProvider storeProvider : storeProviders) { - allStores.addAll(storeProvider.stores(storeName, queryableStoreType)); + for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { + allStores.addAll(storeProvider.stores(storeName, queryableStoreType, includeStaleStores)); } if (allStores.isEmpty()) { throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); } return queryableStoreType.create( - new WrappingStoreProvider(storeProviders), - storeName); + new WrappingStoreProvider(storeProviders, includeStaleStores), + storeName + ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 53c5cc0210672..70c87ae191bc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -18,6 +18,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.QueryableStoreType; @@ -28,11 +29,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; -/** - * Wrapper over StreamThread that implements StateStoreProvider - */ -public class StreamThreadStateStoreProvider implements StateStoreProvider { +public class StreamThreadStateStoreProvider { private final StreamThread streamThread; @@ -41,33 +40,40 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) { } @SuppressWarnings("unchecked") - @Override - public List stores(final String storeName, final QueryableStoreType queryableStoreType) { + public List stores(final String storeName, + final QueryableStoreType queryableStoreType, + final boolean includeStaleStores) { if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } - if (!streamThread.isRunningAndNotRebalancing()) { - throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + - streamThread.state() + ", not RUNNING"); - } - final List stores = new ArrayList<>(); - for (final Task streamTask : streamThread.tasks().values()) { - final StateStore store = streamTask.getStore(storeName); - if (store != null && queryableStoreType.accepts(store)) { - if (!store.isOpen()) { - throw new InvalidStateStoreException("Cannot get state store " + storeName + " for task " + streamTask + - " because the store is not open. The state store may have migrated to another instances."); - } - if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { - stores.add((T) new ReadOnlyKeyValueStoreFacade((TimestampedKeyValueStore) store)); - } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { - stores.add((T) new ReadOnlyWindowStoreFacade((TimestampedWindowStore) store)); - } else { - stores.add((T) store); + final StreamThread.State state = streamThread.state(); + if (includeStaleStores ? state.isAlive() : state == StreamThread.State.RUNNING) { + final Map tasks = includeStaleStores ? streamThread.allTasks() : streamThread.activeTasks(); + final List stores = new ArrayList<>(); + for (final Task streamTask : tasks.values()) { + final StateStore store = streamTask.getStore(storeName); + if (store != null && queryableStoreType.accepts(store)) { + if (!store.isOpen()) { + throw new InvalidStateStoreException( + "Cannot get state store " + storeName + " for task " + streamTask + + " because the store is not open. " + + "The state store may have migrated to another instances."); + } + if (store instanceof TimestampedKeyValueStore && queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) { + stores.add((T) new ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore) store)); + } else if (store instanceof TimestampedWindowStore && queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) { + stores.add((T) new ReadOnlyWindowStoreFacade<>((TimestampedWindowStore) store)); + } else { + stores.add((T) store); + } } } + return stores; + } else { + throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + + state + ", not RUNNING" + + (includeStaleStores ? " or REBALANCING" : "")); } - return stores; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index d94e7cc45d773..9231dfd7ed239 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; import java.util.ArrayList; @@ -28,25 +27,21 @@ */ public class WrappingStoreProvider implements StateStoreProvider { - private final List storeProviders; + private final List storeProviders; + private final boolean includeStaleStores; - WrappingStoreProvider(final List storeProviders) { + WrappingStoreProvider(final List storeProviders, + final boolean includeStaleStores) { this.storeProviders = storeProviders; + this.includeStaleStores = includeStaleStores; } - /** - * Provides access to {@link org.apache.kafka.streams.processor.StateStore}s accepted - * by {@link QueryableStoreType#accepts(StateStore)} - * @param storeName name of the store - * @param type The {@link QueryableStoreType} - * @param The type of the Store, for example, {@link org.apache.kafka.streams.state.ReadOnlyKeyValueStore} - * @return a List of all the stores with the storeName and are accepted by {@link QueryableStoreType#accepts(StateStore)} - */ + @Override public List stores(final String storeName, - final QueryableStoreType type) { + final QueryableStoreType queryableStoreType) { final List allStores = new ArrayList<>(); - for (final StateStoreProvider provider : storeProviders) { - final List stores = provider.stores(storeName, type); + for (final StreamThreadStateStoreProvider provider : storeProviders) { + final List stores = provider.stores(storeName, queryableStoreType, includeStaleStores); allStores.addAll(stores); } if (allStores.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index f6f03795e2ce3..f4987f27f938b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -62,10 +62,8 @@ import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,9 +115,6 @@ public class QueryableStateIntegrationTest { private static final int NUM_BROKERS = 1; - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final int STREAM_THREE_PARTITIONS = 4; @@ -205,7 +200,7 @@ public void before() throws Exception { streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("state-" + applicationId).getPath()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); @@ -293,7 +288,7 @@ private void verifyAllKVKeys(final List streamsList, final int index = queryMetadata.getActiveHost().port(); final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; final ReadOnlyKeyValueStore store = - streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore()); + streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore(), true); if (store == null) { nullStoreKeys.add(key); continue; @@ -352,7 +347,7 @@ private void verifyAllWindowedKeys(final List streamsList, final int index = queryMetadata.getActiveHost().port(); final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; final ReadOnlyWindowStore store = - streamsWithKey.store(storeName, QueryableStoreTypes.windowStore()); + streamsWithKey.store(storeName, QueryableStoreTypes.windowStore(), true); if (store == null) { nullStoreKeys.add(key); continue; @@ -418,7 +413,7 @@ private void assertNoKVKeyFailures(final String storeName, } @Test - public void queryOnRebalance() throws Exception { + public void shouldBeAbleToQueryDuringRebalance() throws Exception { final int numThreads = STREAM_TWO_PARTITIONS; final List streamsList = new ArrayList<>(numThreads); final List listeners = new ArrayList<>(numThreads); @@ -431,6 +426,7 @@ public void queryOnRebalance() throws Exception { final String windowStoreName = "windowed-word-count-store"; for (int i = 0; i < numThreads; i++) { final Properties props = (Properties) streamsConfiguration.clone(); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("shouldBeAbleToQueryDuringRebalance-" + i).getPath()); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i); props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i); final KafkaStreams streams = @@ -509,6 +505,93 @@ public void queryOnRebalance() throws Exception { } } + @Test + public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception { + final int numThreads = STREAM_TWO_PARTITIONS; + final List streamsList = new ArrayList<>(numThreads); + final List listeners = new ArrayList<>(numThreads); + + final ProducerRunnable producerRunnable = new ProducerRunnable(streamThree, inputValues, 1); + producerRunnable.run(); + + // create stream threads + final String storeName = "word-count-store"; + final String windowStoreName = "windowed-word-count-store"; + for (int i = 0; i < numThreads; i++) { + final Properties props = (Properties) streamsConfiguration.clone(); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + i); + props.put(StreamsConfig.CLIENT_ID_CONFIG, "instance-" + i); + props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("shouldBeAbleQueryStandbyStateDuringRebalance-" + i).getPath()); + final KafkaStreams streams = + createCountStream(streamThree, outputTopicThree, outputTopicConcurrentWindowed, storeName, windowStoreName, props); + final KafkaStreamsTest.StateListenerStub listener = new KafkaStreamsTest.StateListenerStub(); + streams.setStateListener(listener); + listeners.add(listener); + streamsList.add(streams); + } + startApplicationAndWaitUntilRunning(streamsList, ofSeconds(60)); + + try { + waitUntilAtLeastNumRecordProcessed(outputTopicThree, 1); + + // Ensure each thread can serve all keys by itself; i.e standby replication works. + for (int i = 0; i < streamsList.size(); i++) { + verifyAllKVKeys( + streamsList, + streamsList.get(i), + listeners.get(i), + inputValuesKeys, + storeName + "-" + streamThree, + DEFAULT_TIMEOUT_MS, + false); + verifyAllWindowedKeys( + streamsList, + streamsList.get(i), + listeners.get(i), + inputValuesKeys, + windowStoreName + "-" + streamThree, + 0L, + WINDOW_SIZE, + DEFAULT_TIMEOUT_MS, + false); + } + + // kill N-1 threads + for (int i = 1; i < streamsList.size(); i++) { + final Duration closeTimeout = Duration.ofSeconds(60); + assertThat(String.format("Streams instance %s did not close in %d ms", i, closeTimeout.toMillis()), + streamsList.get(i).close(closeTimeout)); + } + + waitForApplicationState(streamsList.subList(1, numThreads), State.NOT_RUNNING, Duration.ofSeconds(60)); + + // Now, confirm that all the keys are still queryable on the remaining thread, regardless of the state + verifyAllKVKeys( + streamsList, + streamsList.get(0), + listeners.get(0), + inputValuesKeys, + storeName + "-" + streamThree, + DEFAULT_TIMEOUT_MS, + false); + verifyAllWindowedKeys( + streamsList, + streamsList.get(0), + listeners.get(0), + inputValuesKeys, + windowStoreName + "-" + streamThree, + 0L, + WINDOW_SIZE, + DEFAULT_TIMEOUT_MS, + false); + } finally { + for (final KafkaStreams streams : streamsList) { + streams.close(); + } + } + } + @Test public void concurrentAccesses() throws Exception { final int numIterations = 500000; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java new file mode 100644 index 0000000000000..31b8554612b02 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQuerySuite.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStoreTest; +import org.apache.kafka.streams.state.internals.CompositeReadOnlySessionStoreTest; +import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStoreTest; +import org.apache.kafka.streams.state.internals.GlobalStateStoreProviderTest; +import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProviderTest; +import org.apache.kafka.streams.state.internals.WrappingStoreProviderTest; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * This suite runs all the tests related to querying StateStores (IQ). + * + * It can be used from an IDE to selectively just run these tests. + * + * Tests ending in the word "Suite" are excluded from the gradle build because it + * already runs the component tests individually. + */ +@RunWith(Suite.class) +@Suite.SuiteClasses({ + CompositeReadOnlyKeyValueStoreTest.class, + CompositeReadOnlyWindowStoreTest.class, + CompositeReadOnlySessionStoreTest.class, + GlobalStateStoreProviderTest.class, + StreamThreadStateStoreProviderTest.class, + WrappingStoreProviderTest.class, + QueryableStateIntegrationTest.class, + }) +public class StoreQuerySuite { +} + + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index e3f0a4121e351..8e6b6acf20a7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -343,7 +343,7 @@ private static void produceSynchronously(final String topic, final List !driver.state().isRunning(), DEFAULT_TIMEOUT, "Streams didn't shut down."); + waitForCondition(() -> !driver.state().isRunningOrRebalancing(), DEFAULT_TIMEOUT, "Streams didn't shut down."); assertThat(driver.state(), is(KafkaStreams.State.ERROR)); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d8f5d5564f4d3..047c35f04fdfa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -629,7 +629,7 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo assertEquals(1, clientSupplier.producers.size()); final Producer globalProducer = clientSupplier.producers.get(0); - for (final Task task : thread.tasks().values()) { + for (final Task task : thread.activeTasks().values()) { assertSame(globalProducer, ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()); } assertSame(clientSupplier.consumer, thread.consumer); @@ -670,7 +670,7 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() thread.runOnce(); - assertEquals(thread.tasks().size(), clientSupplier.producers.size()); + assertEquals(thread.activeTasks().size(), clientSupplier.producers.size()); assertSame(clientSupplier.consumer, thread.consumer); assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); } @@ -706,7 +706,7 @@ public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() { thread.shutdown(); thread.run(); - for (final Task task : thread.tasks().values()) { + for (final Task task : thread.activeTasks().values()) { assertTrue(((MockProducer) ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()).closed()); } } @@ -941,7 +941,7 @@ public void shouldNotCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFence thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(); - assertThat(thread.tasks().size(), equalTo(1)); + assertThat(thread.activeTasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); // change consumer subscription from "pattern" to "manual" to be able to call .addRecords() @@ -968,7 +968,7 @@ public void shouldNotCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFence fail("Should have thrown TaskMigratedException"); } catch (final TaskMigratedException expected) { assertTrue("StreamsThread removed the fenced zombie task already, should wait for rebalance to close all zombies together.", - thread.tasks().containsKey(task1)); + thread.activeTasks().containsKey(task1)); } assertThat(producer.commitCount(), equalTo(1L)); @@ -1003,14 +1003,14 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedIn thread.runOnce(); - assertThat(thread.tasks().size(), equalTo(1)); + assertThat(thread.activeTasks().size(), equalTo(1)); clientSupplier.producers.get(0).fenceProducer(); thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); assertTrue(clientSupplier.producers.get(0).transactionInFlight()); assertFalse(clientSupplier.producers.get(0).transactionCommitted()); assertTrue(clientSupplier.producers.get(0).closed()); - assertTrue(thread.tasks().isEmpty()); + assertTrue(thread.activeTasks().isEmpty()); } @Test @@ -1042,7 +1042,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedIn thread.runOnce(); - assertThat(thread.tasks().size(), equalTo(1)); + assertThat(thread.activeTasks().size(), equalTo(1)); clientSupplier.producers.get(0).fenceProducerOnClose(); thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); @@ -1050,7 +1050,7 @@ public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedIn assertFalse(clientSupplier.producers.get(0).transactionInFlight()); assertTrue(clientSupplier.producers.get(0).transactionCommitted()); assertFalse(clientSupplier.producers.get(0).closed()); - assertTrue(thread.tasks().isEmpty()); + assertTrue(thread.activeTasks().isEmpty()); } private static class StateListenerStub implements StreamThread.StateListener { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java index 458c9a62a8d2e..f8af40ba63d48 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyKeyValueStoreTest.java @@ -32,11 +32,11 @@ import org.junit.Before; import org.junit.Test; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -63,9 +63,10 @@ public void before() { stubProviderOne.addStore("other-store", otherUnderlyingStore); theStore = new CompositeReadOnlyKeyValueStore<>( - new WrappingStoreProvider(Arrays.asList(stubProviderOne, stubProviderTwo)), - QueryableStoreTypes.keyValueStore(), - storeName); + new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), false), + QueryableStoreTypes.keyValueStore(), + storeName + ); } private KeyValueStore newStoreInstance() { @@ -293,8 +294,11 @@ public long approximateNumEntries() { } private CompositeReadOnlyKeyValueStore rebalancing() { - return new CompositeReadOnlyKeyValueStore<>(new WrappingStoreProvider(Collections.singletonList(new StateStoreProviderStub(true))), - QueryableStoreTypes.keyValueStore(), storeName); + return new CompositeReadOnlyKeyValueStore<>( + new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), false), + QueryableStoreTypes.keyValueStore(), + storeName + ); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java index 23534ef553cf0..419b7b339cb7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlySessionStoreTest.java @@ -29,9 +29,9 @@ import org.junit.Test; import java.util.Arrays; -import java.util.Collections; import java.util.List; +import static java.util.Collections.singletonList; import static org.apache.kafka.test.StreamsTestUtils.toList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; @@ -55,8 +55,8 @@ public void before() { sessionStore = new CompositeReadOnlySessionStore<>( - new WrappingStoreProvider(Arrays.asList(stubProviderOne, stubProviderTwo)), - QueryableStoreTypes.sessionStore(), storeName); + new WrappingStoreProvider(Arrays.asList(stubProviderOne, stubProviderTwo), false), + QueryableStoreTypes.sessionStore(), storeName); } @Test @@ -90,8 +90,8 @@ public void shouldFindValueForKeyWhenMultiStores() { final List, Long>> keyOneResults = toList(sessionStore.fetch("key-one")); final List, Long>> keyTwoResults = toList(sessionStore.fetch("key-two")); - assertEquals(Collections.singletonList(KeyValue.pair(keyOne, 0L)), keyOneResults); - assertEquals(Collections.singletonList(KeyValue.pair(keyTwo, 10L)), keyTwoResults); + assertEquals(singletonList(KeyValue.pair(keyOne, 0L)), keyOneResults); + assertEquals(singletonList(KeyValue.pair(keyTwo, 10L)), keyTwoResults); } @Test @@ -109,9 +109,10 @@ public void shouldNotGetValueFromOtherStores() { public void shouldThrowInvalidStateStoreExceptionOnRebalance() { final CompositeReadOnlySessionStore store = new CompositeReadOnlySessionStore<>( - new StateStoreProviderStub(true), + new WrappingStoreProvider(singletonList(new StateStoreProviderStub(true)), false), QueryableStoreTypes.sessionStore(), - "whateva"); + "whateva" + ); store.fetch("a"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java index 2307b71aa33bd..6495d8070916d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStoreTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.StateStoreProviderStub; import org.apache.kafka.test.StreamsTestUtils; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,6 +36,10 @@ import static java.time.Instant.ofEpochMilli; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import static org.junit.Assert.assertEquals; @@ -63,9 +68,10 @@ public void before() { windowStore = new CompositeReadOnlyWindowStore<>( - new WrappingStoreProvider(Arrays.asList(stubProviderOne, stubProviderTwo)), - QueryableStoreTypes.windowStore(), - storeName); + new WrappingStoreProvider(asList(stubProviderOne, stubProviderTwo), false), + QueryableStoreTypes.windowStore(), + storeName + ); } @Test @@ -116,7 +122,16 @@ public void shouldNotGetValuesFromOtherStores() { @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStateStoreExceptionOnRebalance() { - final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new StateStoreProviderStub(true), QueryableStoreTypes.windowStore(), "foo"); + final StateStoreProvider storeProvider = EasyMock.createNiceMock(StateStoreProvider.class); + EasyMock.expect(storeProvider.stores(anyString(), anyObject())) + .andThrow(new InvalidStateStoreException("store is unavailable")); + EasyMock.replay(storeProvider); + + final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>( + storeProvider, + QueryableStoreTypes.windowStore(), + "foo" + ); store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); } @@ -124,7 +139,11 @@ public void shouldThrowInvalidStateStoreExceptionOnRebalance() { public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() { underlyingWindowStore.setOpen(false); final CompositeReadOnlyWindowStore store = - new CompositeReadOnlyWindowStore<>(stubProviderOne, QueryableStoreTypes.windowStore(), "window-store"); + new CompositeReadOnlyWindowStore<>( + new WrappingStoreProvider(singletonList(stubProviderOne), false), + QueryableStoreTypes.windowStore(), + "window-store" + ); try { store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); Assert.fail("InvalidStateStoreException was expected"); @@ -136,8 +155,15 @@ public void shouldThrowInvalidStateStoreExceptionIfFetchThrows() { @Test public void emptyIteratorAlwaysReturnsFalse() { - final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new - StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo"); + final StateStoreProvider storeProvider = EasyMock.createNiceMock(StateStoreProvider.class); + EasyMock.expect(storeProvider.stores(anyString(), anyObject())).andReturn(emptyList()); + EasyMock.replay(storeProvider); + + final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>( + storeProvider, + QueryableStoreTypes.windowStore(), + "foo" + ); final WindowStoreIterator windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); Assert.assertFalse(windowStoreIterator.hasNext()); @@ -145,16 +171,30 @@ public void emptyIteratorAlwaysReturnsFalse() { @Test public void emptyIteratorPeekNextKeyShouldThrowNoSuchElementException() { - final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new - StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo"); + final StateStoreProvider storeProvider = EasyMock.createNiceMock(StateStoreProvider.class); + EasyMock.expect(storeProvider.stores(anyString(), anyObject())).andReturn(emptyList()); + EasyMock.replay(storeProvider); + + final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>( + storeProvider, + QueryableStoreTypes.windowStore(), + "foo" + ); final WindowStoreIterator windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); assertThrows(NoSuchElementException.class, windowStoreIterator::peekNextKey); } @Test public void emptyIteratorNextShouldThrowNoSuchElementException() { - final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>(new - StateStoreProviderStub(false), QueryableStoreTypes.windowStore(), "foo"); + final StateStoreProvider storeProvider = EasyMock.createNiceMock(StateStoreProvider.class); + EasyMock.expect(storeProvider.stores(anyString(), anyObject())).andReturn(emptyList()); + EasyMock.replay(storeProvider); + + final CompositeReadOnlyWindowStore store = new CompositeReadOnlyWindowStore<>( + storeProvider, + QueryableStoreTypes.windowStore(), + "foo" + ); final WindowStoreIterator windowStoreIterator = store.fetch("key", ofEpochMilli(1), ofEpochMilli(10)); assertThrows(NoSuchElementException.class, windowStoreIterator::next); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java index 7f5b231406f0b..de202f0cec9ab 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java @@ -46,43 +46,45 @@ public void before() { globalStateStores = new HashMap<>(); storeProvider = new QueryableStoreProvider( - Collections.singletonList(theStoreProvider), new GlobalStateStoreProvider(globalStateStores)); + Collections.singletonList(theStoreProvider), + new GlobalStateStoreProvider(globalStateStores) + ); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfKVStoreDoesntExist() { - storeProvider.getStore("not-a-store", QueryableStoreTypes.keyValueStore()); + storeProvider.getStore("not-a-store", QueryableStoreTypes.keyValueStore(), false); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfWindowStoreDoesntExist() { - storeProvider.getStore("not-a-store", QueryableStoreTypes.windowStore()); + storeProvider.getStore("not-a-store", QueryableStoreTypes.windowStore(), false); } @Test public void shouldReturnKVStoreWhenItExists() { - assertNotNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.keyValueStore())); + assertNotNull(storeProvider.getStore(keyValueStore, QueryableStoreTypes.keyValueStore(), false)); } @Test public void shouldReturnWindowStoreWhenItExists() { - assertNotNull(storeProvider.getStore(windowStore, QueryableStoreTypes.windowStore())); + assertNotNull(storeProvider.getStore(windowStore, QueryableStoreTypes.windowStore(), false)); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() { - storeProvider.getStore(windowStore, QueryableStoreTypes.keyValueStore()); + storeProvider.getStore(windowStore, QueryableStoreTypes.keyValueStore(), false); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() { - storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore()); + storeProvider.getStore(keyValueStore, QueryableStoreTypes.windowStore(), false); } @Test public void shouldFindGlobalStores() { globalStateStores.put("global", new NoOpReadOnlyStore<>()); - assertNotNull(storeProvider.getStore("global", QueryableStoreTypes.keyValueStore())); + assertNotNull(storeProvider.getStore("global", QueryableStoreTypes.keyValueStore(), false)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index ce816195fb9db..48b1625678908 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -60,8 +60,6 @@ import java.util.Map; import java.util.Properties; -import static org.apache.kafka.streams.state.QueryableStoreTypes.timestampedWindowStore; -import static org.apache.kafka.streams.state.QueryableStoreTypes.windowStore; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; @@ -162,7 +160,7 @@ public void cleanUp() throws IOException { public void shouldFindKeyValueStores() { mockThread(true); final List> kvStores = - provider.stores("kv-store", QueryableStoreTypes.keyValueStore()); + provider.stores("kv-store", QueryableStoreTypes.keyValueStore(), false); assertEquals(2, kvStores.size()); for (final ReadOnlyKeyValueStore store: kvStores) { assertThat(store, instanceOf(ReadOnlyKeyValueStore.class)); @@ -174,7 +172,7 @@ public void shouldFindKeyValueStores() { public void shouldFindTimestampedKeyValueStores() { mockThread(true); final List>> tkvStores = - provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()); + provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore(), false); assertEquals(2, tkvStores.size()); for (final ReadOnlyKeyValueStore> store: tkvStores) { assertThat(store, instanceOf(ReadOnlyKeyValueStore.class)); @@ -186,7 +184,7 @@ public void shouldFindTimestampedKeyValueStores() { public void shouldNotFindKeyValueStoresAsTimestampedStore() { mockThread(true); final List>> tkvStores = - provider.stores("kv-store", QueryableStoreTypes.timestampedKeyValueStore()); + provider.stores("kv-store", QueryableStoreTypes.timestampedKeyValueStore(), false); assertEquals(0, tkvStores.size()); } @@ -194,7 +192,7 @@ public void shouldNotFindKeyValueStoresAsTimestampedStore() { public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() { mockThread(true); final List>> tkvStores = - provider.stores("timestamped-kv-store", QueryableStoreTypes.keyValueStore()); + provider.stores("timestamped-kv-store", QueryableStoreTypes.keyValueStore(), false); assertEquals(2, tkvStores.size()); for (final ReadOnlyKeyValueStore> store: tkvStores) { assertThat(store, instanceOf(ReadOnlyKeyValueStore.class)); @@ -206,7 +204,7 @@ public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() { public void shouldFindWindowStores() { mockThread(true); final List> windowStores = - provider.stores("window-store", windowStore()); + provider.stores("window-store", QueryableStoreTypes.windowStore(), false); assertEquals(2, windowStores.size()); for (final ReadOnlyWindowStore store: windowStores) { assertThat(store, instanceOf(ReadOnlyWindowStore.class)); @@ -218,7 +216,7 @@ public void shouldFindWindowStores() { public void shouldFindTimestampedWindowStores() { mockThread(true); final List>> windowStores = - provider.stores("timestamped-window-store", timestampedWindowStore()); + provider.stores("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore(), false); assertEquals(2, windowStores.size()); for (final ReadOnlyWindowStore> store: windowStores) { assertThat(store, instanceOf(ReadOnlyWindowStore.class)); @@ -230,7 +228,7 @@ public void shouldFindTimestampedWindowStores() { public void shouldNotFindWindowStoresAsTimestampedStore() { mockThread(true); final List>> windowStores = - provider.stores("window-store", timestampedWindowStore()); + provider.stores("window-store", QueryableStoreTypes.timestampedWindowStore(), false); assertEquals(0, windowStores.size()); } @@ -238,7 +236,7 @@ public void shouldNotFindWindowStoresAsTimestampedStore() { public void shouldFindTimestampedWindowStoresAsWindowStore() { mockThread(true); final List>> windowStores = - provider.stores("timestamped-window-store", windowStore()); + provider.stores("timestamped-window-store", QueryableStoreTypes.windowStore(), false); assertEquals(2, windowStores.size()); for (final ReadOnlyWindowStore> store: windowStores) { assertThat(store, instanceOf(ReadOnlyWindowStore.class)); @@ -250,28 +248,28 @@ public void shouldFindTimestampedWindowStoresAsWindowStore() { public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() { mockThread(true); taskOne.getStore("kv-store").close(); - provider.stores("kv-store", QueryableStoreTypes.keyValueStore()); + provider.stores("kv-store", QueryableStoreTypes.keyValueStore(), false); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() { mockThread(true); taskOne.getStore("timestamped-kv-store").close(); - provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()); + provider.stores("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore(), false); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() { mockThread(true); taskOne.getStore("window-store").close(); - provider.stores("window-store", QueryableStoreTypes.windowStore()); + provider.stores("window-store", QueryableStoreTypes.windowStore(), false); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() { mockThread(true); taskOne.getStore("timestamped-window-store").close(); - provider.stores("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()); + provider.stores("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore(), false); } @Test @@ -279,7 +277,7 @@ public void shouldReturnEmptyListIfNoStoresFoundWithName() { mockThread(true); assertEquals( Collections.emptyList(), - provider.stores("not-a-store", QueryableStoreTypes.keyValueStore())); + provider.stores("not-a-store", QueryableStoreTypes.keyValueStore(), false)); } @Test @@ -287,14 +285,14 @@ public void shouldReturnEmptyListIfStoreExistsButIsNotOfTypeValueStore() { mockThread(true); assertEquals( Collections.emptyList(), - provider.stores("window-store", QueryableStoreTypes.keyValueStore()) + provider.stores("window-store", QueryableStoreTypes.keyValueStore(), false) ); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() { mockThread(false); - provider.stores("kv-store", QueryableStoreTypes.keyValueStore()); + provider.stores("kv-store", QueryableStoreTypes.keyValueStore(), false); } private StreamTask createStreamsTask(final StreamsConfig streamsConfig, @@ -321,8 +319,11 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, } private void mockThread(final boolean initialized) { - EasyMock.expect(threadMock.isRunningAndNotRebalancing()).andReturn(initialized); - EasyMock.expect(threadMock.tasks()).andStubReturn(tasks); + EasyMock.expect(threadMock.isRunning()).andReturn(initialized); + EasyMock.expect(threadMock.activeTasks()).andStubReturn(tasks); + EasyMock.expect(threadMock.state()).andReturn( + initialized ? StreamThread.State.RUNNING : StreamThread.State.PARTITIONS_ASSIGNED + ).anyTimes(); EasyMock.replay(threadMock); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index b864732098fd9..651d185eb7e34 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -56,7 +56,9 @@ public void before() { stubProviderTwo.addStore("window", new NoOpWindowStore()); wrappingStoreProvider = new WrappingStoreProvider( - Arrays.asList(stubProviderOne, stubProviderTwo)); + Arrays.asList(stubProviderOne, stubProviderTwo), + false + ); } @Test diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java index 75340713f5a92..09a6829386e3b 100644 --- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java +++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java @@ -19,26 +19,28 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; -import org.apache.kafka.streams.state.internals.StateStoreProvider; +import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -public class StateStoreProviderStub implements StateStoreProvider { +public class StateStoreProviderStub extends StreamThreadStateStoreProvider { private final Map stores = new HashMap<>(); private final boolean throwException; public StateStoreProviderStub(final boolean throwException) { - + super(null); this.throwException = throwException; } @SuppressWarnings("unchecked") @Override - public List stores(final String storeName, final QueryableStoreType queryableStoreType) { + public List stores(final String storeName, + final QueryableStoreType queryableStoreType, + final boolean includeStaleStores) { if (throwException) { throw new InvalidStateStoreException("store is unavailable"); }