Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6144: option to query restoring and standby #7962

Closed
wants to merge 1 commit into from
Closed

KAFKA-6144: option to query restoring and standby #7962

wants to merge 1 commit into from

Conversation

vvcephei
Copy link
Contributor

This is based on a temporary branch, which is mirrored from #7960.

I will delete the temporary branch once #7960 is merged and re-target this PR to trunk.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vvcephei
Copy link
Contributor Author

@vinothchandar & @brary, This is the second chunk from your PR #7868, which I reviewed and feel good about merging. I'm sending a message to the mailing list for KIP-535, since this PR reveals some small changes to the public API.

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall.. Bunch of cosmetic suggestions. but one clarification on correctness/test

*
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param queryStaleData If false, only permit queries on the leader replica for a partition, and only if the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use active instead of leader to stay consistent with streams terminology?

*
* @param storeProvider provides access to all the underlying StateStore instances
* @param storeName The name of the Store
* @param queryStaleState Whether to allow querying recovering and standby stores.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recovering -> restoring? (recovering talks about task state and not store state?)

*/
public <T> T store(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final boolean queryStaleData) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

standardize on queryStaleState vs queryStaleData?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. slipped past me.

* @param storeName name of the store
* @param queryableStoreType accept stores passing {@link QueryableStoreType#accepts(StateStore)}
* @param <T> The expected type of the returned store
* @param includeStandbyAndRecovering if true, include standbys and recovering stores;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to swtich to includeStandbyAndRecovering? could we keep queryStaleState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had been thinking about it as a directive internally about which stores to bundle, versus an API statement about which consistency level to query at, but upon reflection, even the API is just getting a store, not actually doing a query. I'm standardizing both terms to be the same.

} else {
stores.add((T) store);
final StreamThread.State state = streamThread.state();
if (includeStandbyAndRecovering ? state.isRunning() : state == StreamThread.State.RUNNING) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is creative :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state.isRunning() is as below

public boolean isRunning() {
            return equals(RUNNING) || equals(STARTING) || equals(PARTITIONS_REVOKED) || equals(PARTITIONS_ASSIGNED);
        }

if its not too much work, we can rename to something like state.isAlive() , that captures what we want to check .. your call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where includeStandbyAndRecovering=false and the state is RUNNING.. We still want to disallow queries on standbys? from code below, it seems like we will enter the if block and loop over all active and standby tasks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really nice catch! Thanks.

Regarding isRunning, I had a similar thought, but couldn't remember if StreamThread.State is exposed in the public API. After some code analysis, I don't think it is, so I've renamed it.


waitForApplicationState(streamsList.subList(1, numThreads), State.NOT_RUNNING, Duration.ofSeconds(60));

// Now, confirm that all the keys are still queryable on the remaining thread, regardless of the state
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To close out the earlier thread.. This test is okay, since NOT_RUNNING will make that instance go to DEAD state (or some non functional state like that) where the store cannot be obtained.. the lines below check that we can stil retrieve the keys from the other replica

@@ -64,8 +64,9 @@ public void before() {

theStore = new CompositeReadOnlyKeyValueStore<>(
new WrappingStoreProvider(Arrays.<StateStoreProvider>asList(stubProviderOne, stubProviderTwo)),
QueryableStoreTypes.<String, String>keyValueStore(),
storeName);
QueryableStoreTypes.<String, String>keyValueStore(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the kafka way to align this with the ( .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not fixed in the style guide. The only thing is says is either to put all params on a single line or put each param on a separate line.

What I've seen (which I think makes sense) is, if the line's not too long, and it doesn't hinder readability, you can either do this:

myMethod(one
         two,
         three);

or

myMethod(
  one,
  two,
  three
);

Experience says that either one might be more readable in different circumstances, so it makes sense to allow both. They're both equally obvious regarding where all the params are and where the method call ends. The latter one uses more vertical space for simple method calls, but is more friendly when some of the arguments are themselves method calls.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I was more curious about this, than anything.. Personally, the latter is very natural and easy to do. I just go with the flow, in these things. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I also prefer the latter. Thanks for asking! It's way easier to review code that already conforms to the norm, whether or not it's required by the style guide.

@@ -509,6 +509,94 @@ public void queryOnRebalance() throws Exception {
}
}

@Test
public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tthese verifyAllKVKeys methods will call final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.keyValueStore());, which should fail by default on the standby? (given thats changed from original PR)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth having two tests for both modes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's correct. The test does fail on this version of the code. Nice catch!

The test for active replicas would pass regardless, it's just a question of whether it returns the data during recovery or after the transition to running. But the test you added for standbys would only pass if we allow querying stale state.

I'm not sure it makes sense to add a negative test for standbys, though, since there are already tests ensuring that we throw the appropriate exceptions when using the old method (with a default of no stale stores). WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense... I will take another look at this when I rebase against the lag PR

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vvcephei vvcephei changed the base branch from temp-kafka-6144-key-query-metadata to trunk January 15, 2020 15:50
Implements: KIP-535
Co-authored-by: Navinder Pal Singh Brar <[email protected]>
Reviewed-by: John Roesler <[email protected]>
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just few comments on interfaces.. This looks much simpler though

*
* @param storeName name of the store to find
* @param queryableStoreType accept only stores that are accepted by {@link QueryableStoreType#accepts(StateStore)}
* @param includeStaleStores If false, only permit queries on the active replica for a partition, and only if the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cosmetic: extra space at the start

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed that, too. Oh well.

final List<T> globalStore = globalStoreProvider.stores(storeName, queryableStoreType);
if (!globalStore.isEmpty()) {
return queryableStoreType.create(new WrappingStoreProvider(singletonList(globalStoreProvider)), storeName);
return queryableStoreType.create(globalStoreProvider, storeName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking aloud: guess there is nt much value in wrapping a single provider.. so +1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the role of the wrapping store provider really is nothing more than just iterating over the list of providers. For global stores, there's always exactly one provider, so it's purely a performance penalty for no gain at all.

/**
* A wrapper over all of the {@link StateStoreProvider}s in a Topology
*/
public class QueryableStoreProvider {

private final List<StateStoreProvider> storeProviders;
private final List<StreamThreadStateStoreProvider> storeProviders;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking aloud: even though we are binding to a specific StateStoreProvider implementation here, it seems fine,since there are n't any other really in a topology

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the point of this change, actually, StreamThreadStateStoreProvider is no longer a StateStoreProvider. If you need a StateStoreProvider, you have to adapt it with WrappingStoreProvider.

* Wrapper over StreamThread that implements StateStoreProvider
*/
public class StreamThreadStateStoreProvider implements StateStoreProvider {
public class StreamThreadStateStoreProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, just Global and Wrapping StateStoreProviders exist? IIUC, most of the xxxStore classes are just accessing the Wrapping.. store provider? Makes me wonder, if we should just use the QueryableStoreProvider everywhere and cull the interface..

Anyway, I am not familiar enough with this part of the code. So I leave it to you..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel that we can collapse the layered interfaces a bit further after we've changed this, since the original motivation of having it is just to "stitch" the global stores and local stores together when exposing as KafkaStreams#stores.

We can consider that in a separate, cleanup PR afterwards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, StateStoreProvider is used in the public API as part of QueryableStoreType. Its role is well defined already. That's the one that we should continue to use "everywhere".

QueryableStoreProvider is a utility for selecting either the global store to return OR packaging the thread store providers into a WrappingStoreProvider and using it to actually create a usable store. It's not a StateStoreProvider at all.

WrappingStoreProvider on the other hand is a StateStoreProvider, and adapts a List<StreamThreadStoreProvider> to the StateStoreProvider interface.

GlobalStoreProvider is also a StateStoreProvider.

So, if anything, we could increase clarity by renaming some of these classes to more accurately reflect their roles, but all the involved classes have well defined and necessary roles right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang I think we wrote our comments at the same time. Just to respond directly, I do think that we can clean up the code base in a follow-on change (by renaming stuff that's named misleadingly now), but I don't think we can collapse the hierarchy at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! SGTM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can work on the follow-on change.

WrappingStoreProvider -> WrappingStateStoreProvider
GlobalStoreProvider -> GlobalStateStoreProvider

QueryableStoreProvider is left as is.

@@ -352,7 +347,7 @@ private void verifyAllWindowedKeys(final List<KafkaStreams> streamsList,
final int index = queryMetadata.getActiveHost().port();
final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams;
final ReadOnlyWindowStore<String, Long> store =
streamsWithKey.store(storeName, QueryableStoreTypes.windowStore());
streamsWithKey.store(storeName, QueryableStoreTypes.windowStore(), true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. this should now allow standby to be queried

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: in this PR we did not make changes to QueryableStoreType as discussed in the mailing thread, is that going to be in a different PR?

* Wrapper over StreamThread that implements StateStoreProvider
*/
public class StreamThreadStateStoreProvider implements StateStoreProvider {
public class StreamThreadStateStoreProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel that we can collapse the layered interfaces a bit further after we've changed this, since the original motivation of having it is just to "stitch" the global stores and local stores together when exposing as KafkaStreams#stores.

We can consider that in a separate, cleanup PR afterwards.

@vvcephei
Copy link
Contributor Author

Thanks for the review @vinothchandar ! I think you've pointed out some good opportunities to improve the clarity of the internal implementation, but I'd like to defer that for now, so that we can continue to make progress on #7868

@vvcephei
Copy link
Contributor Author

@guozhangwang , that is correct. It turned out that there was some abuse of the class hierarchy in the code base, which, once removed, means that we didn't have to change the QueryableStoreType interface. There will be no follow up.

@guozhangwang
Copy link
Contributor

LGTM. Please feel free to merge.

@vvcephei
Copy link
Contributor Author

Thanks, all!

The one test failure was in org.apache.kafka.streams.integration.BranchedMultiLevelRepartitionConnectedTopologyTest.testTopologyBuild, due to the test not being able to connect to its broker within the timeout.

I'm proceeding to merge this.

@vvcephei vvcephei closed this in 0c76fbb Jan 15, 2020
@vvcephei vvcephei deleted the kafka-6144-query-restoring-and-standby branch January 15, 2020 19:49
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 21, 2020
Conflicts or compilation errors due to the fact that we temporarily
reverted the commit that removes Scala 2.11 support:

* AclCommand.scala: take upstream changes.
* AclCommandTest.scala: take upstream changes.
* TransactionCoordinatorTest.scala: don't use SAMs, but adjust
mock call to putTransactionStateIfNotExists given new signature.
* TransactionStateManagerTest: use Runnable instead of SAMs.
* PartitionLockTest: use Runnable instead of SAMs.
* docs/upgrade.html: take upstream changes excluding line that
states that Scala 2.11 support has been removed.

* apache-github/trunk: (28 commits)
  KAFKA-9457; Fix flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose (apache#7989)
  MINOR: Update AclCommand help message to match implementation (apache#7990)
  MINOR: Update introduction page in Kafka documentation
  MINOR: Use Math.min for StreamsPartitionAssignor#updateMinReceivedVersion method (apache#7954)
  KAFKA-9338; Fetch session should cache request leader epoch (apache#7970)
  KAFKA-9329; KafkaController::replicasAreValid should return error message (apache#7865)
  KAFKA-9449; Adds support for closing the producer's BufferPool. (apache#7967)
  MINOR: Handle expandIsr in PartitionLockTest and ensure read threads not blocked on write (apache#7973)
  MINOR: Fix typo in connect integration test class name (apache#7976)
  KAFKA-9218: MirrorMaker 2 can fail to create topics (apache#7745)
  KAFKA-8847; Deprecate and remove usage of supporting classes in kafka.security.auth (apache#7966)
  MINOR: Suppress DescribeConfigs Denied log during CreateTopics (apache#7971)
  [MINOR]: Fix typo in Fetcher comment (apache#7934)
  MINOR: Remove unnecessary call to `super` in `MetricConfig` constructor (apache#7975)
  MINOR: fix flaky StreamsUpgradeTestIntegrationTest (apache#7974)
  KAFKA-9431: Expose API in KafkaStreams to fetch all local offset lags (apache#7961)
  KAFKA-9235; Ensure transaction coordinator is stopped after replica deletion (apache#7963)
  KAFKA-9410; Make groupId Optional in KafkaConsumer (apache#7943)
  MINOR: Removed accidental double negation in error message. (apache#7834)
  KAFKA-6144: IQ option to query standbys (apache#7962)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants