Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: prevent cleanup() from being called while Streams is still shutting down #10666

Conversation

ableegoldman
Copy link
Member

Currently KafkaStreams#cleanUp only throw an IllegalStateException if the state is RUNNING or REBALANCING, however the application could be in the process of shutting down in which case StreamThreads may still be running. We should also throw if the state is PENDING_ERROR or PENDING_SHUTDOWN

gardnervickers pushed a commit to gardnervickers/kafka-1 that referenced this pull request May 12, 2021
…er (apache#10666)

Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true.  We now have more complex requirements for
API accessibility.

For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc.  Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.

All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above.  This PR introduces a
new "listeners" field to the request schema definitions.  This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller".  ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).

This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type.  Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.

Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.

One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners.  This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.

Reviewers: Colin P. McCabe <[email protected]>, Ismael Juma <[email protected]>
@ableegoldman
Copy link
Member Author

cc @guozhangwang @cadonna @wcarlson5 @lct45 to review

Copy link
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to get this cleaned up!

Comment on lines -173 to -174
streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test failing does give me some small pause on just throwing an IllegalStateException in case of PENDING_SHUTDOWN/ERROR -- do we think many users follow this pattern of calling the non-blocking close and then immediately calling cleanUp()? Maybe the better question is, should we support this, or is it alright to just enforce that users wait for Streams to shut down before invoking cleanUp -- if we do want to support this, we could just signal to the shutdown thread that it should re-invoke cleanUp once Streams finished shutting down, instead of just throwing an IllegalStateException. Not sure if it's really worth it or not, though -- thoughts @guozhangwang @wcarlson5 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How hard could it be to have cleanUp() check set a flag for the shutdown thread to call clean up after it's done? We want to make sure the shutdown thread checks after it changes the state to NOT_RUNNING or ERROR.

I think this a much more flexible pattern that would be good to support if it's running or rebalancing we would still want to throw and IllegalStateException or maybe IllegalOperationException. But either way it would be good to test when stream is in a PENDING state. But I think you would have to make that a unit test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have a unit test in this PR for the PENDING case, I should be able to adapt that to the new behavior without much trouble, if we want to change it. I also don't think it would be that difficult to implement this behavior -- I guess it was more a question of whether we should handle this semi-transparently or just tell the user to be patient and try again after the close. For example, at the moment close() will launch the shutdown helper as a daemon thread, so if the user does a non-blocking close followed by cleanUp and that's the end of the app, most likely the clean up will not occur (nor will the app complete its shutdown)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another alternative is just to have cleanUp block until Streams finishes shutting down, but I don't think we want to make it essentially block on the results of close(), at least not without giving users a way to supply a timeout, and I don't think it's worth doing an entire KIP just to add an overload with one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if they call a non blocking close then clean up then later notice the leftovers that is not a great experience and if can prevent it we should. I agree probably not worth a KIP though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, if user calls a close() with no blocking and then cleanUp, the latter cleanUp could become a no-op because the locks are not released yet, and user won't know; whereas now it would throw an exception saying that close is not done yet, right? Plus our javadoc of cleanUp states that

     * Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
     * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
     * <p>
     * May only be called either before this {@code KafkaStreams} instance is {@link #start() started} or after the
     * instance is {@link #close() closed}.

Hence I think the new behavior is not really a regression but really a fix forward.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change LGTM.

}
Utils.delete(taskDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's a nice one: we are sure all threads are gone now so there's no need to lock :)

Comment on lines -173 to -174
streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, if user calls a close() with no blocking and then cleanUp, the latter cleanUp could become a no-op because the locks are not released yet, and user won't know; whereas now it would throw an exception saying that close is not done yet, right? Plus our javadoc of cleanUp states that

     * Do a clean up of the local {@link StateStore} directory ({@link StreamsConfig#STATE_DIR_CONFIG}) by deleting all
     * data with regard to the {@link StreamsConfig#APPLICATION_ID_CONFIG application ID}.
     * <p>
     * May only be called either before this {@code KafkaStreams} instance is {@link #start() started} or after the
     * instance is {@link #close() closed}.

Hence I think the new behavior is not really a regression but really a fix forward.

@ableegoldman
Copy link
Member Author

Just some unrelated flaky tests:
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
kafka.connect.integration.ConnectorClientPolicyIntegrationTest.testCreateWithOverridesForNonePolicy()
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()

@ableegoldman ableegoldman merged commit 4153e75 into apache:trunk May 13, 2021
jwijgerd pushed a commit to jwijgerd/kafka that referenced this pull request May 15, 2021
…er (apache#10666)

Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true.  We now have more complex requirements for
API accessibility.

For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc.  Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.

All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above.  This PR introduces a
new "listeners" field to the request schema definitions.  This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller".  ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).

This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type.  Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.

Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.

One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners.  This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.

Reviewers: Colin P. McCabe <[email protected]>, Ismael Juma <[email protected]>
ijuma added a commit to ijuma/kafka that referenced this pull request May 16, 2021
…e-allocations-lz4

* apache-github/trunk: (155 commits)
  KAFKA-12728: Upgrade gradle to 7.0.2 and shadow to 7.0.0 (apache#10606)
  KAFKA-12778: Fix QuorumController request timeouts and electLeaders (apache#10688)
  KAFKA-12754: Improve endOffsets for TaskMetadata (apache#10634)
  Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10680)
  MINOR: set replication.factor to 1 to make StreamsBrokerCompatibilityService work with old broker (apache#10673)
  MINOR: prevent cleanup() from being called while Streams is still shutting down (apache#10666)
  KAFKA-8326: Introduce List Serde (apache#6592)
  KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller (apache#10679)
  KAFKA-12648: MINOR - Add TopologyMetadata.Subtopology class for subtopology metadata (apache#10676)
  MINOR: Update jacoco to 0.8.7 for JDK 16 support (apache#10654)
  MINOR: exclude all `src/generated` and `src/generated-test` (apache#10671)
  KAFKA-12772: Move all transaction state transition rules into their states (apache#10667)
  KAFKA-12758 Added `server-common` module to have server side common classes.  (apache#10638)
  MINOR Removed copying storage libraries specifically as they are already copied. (apache#10647)
  KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (apache#10657)
  KAFKA-12747: Fix flakiness in shouldReturnUUIDsWithStringPrefix (apache#10643)
  MINOR: remove unnecessary placeholder from WorkerSourceTask#recordSent (apache#10659)
  MINOR: Remove unused `scalatest` definition from `dependencies.gradle` (apache#10655)
  MINOR: checkstyle version upgrade: 8.20 -> 8.36.2 (apache#10656)
  KAFKA-12464: minor code cleanup and additional logging in constrained sticky assignment (apache#10645)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants