-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka upgrade #881
Kafka upgrade #881
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good, except one comment.
.../src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/EmbeddedKafkaCluster.java
Show resolved
Hide resolved
.../src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -766,7 +769,7 @@ protected void updateConsumerAssignment(Collection<TopicPartition> partitions) { | |||
public void onPartitionsRevoked(Collection<TopicPartition> topicPartitions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaMirrorMakerConnectorTask
is overriding this method. You will have to fix this logic there as well. Without that this change is incomplete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaMirrorMakerConnectorTask is calling super.Same with onPartitionsRevoked. So I am thinking change in AbstractKafkaBasedConnectorTask should take care of it. COuld you pls confirm?
@OverRide
public void onPartitionsRevoked(Collection partitions) {
super.onPartitionsRevoked(partitions);
_topicManager.onPartitionsRevoked(partitions);
}
@OverRide
public void run() {
if (_enablePartitionAssignment) {
try {
LOG.info("Trying to acquire the lock on datastreamTask: {}", _datastreamTask);
_datastreamTask.acquire(LOCK_ACQUIRE_TIMEOUT);
} catch (DatastreamRuntimeException ex) {
LOG.error(String.format("Failed to acquire lock for datastreamTask %s", _datastreamTask), ex);
_dynamicMetricsManager.createOrUpdateMeter(generateMetricsPrefix(_connectorName, CLASS_NAME), _datastreamName,
TASK_LOCK_ACQUIRE_ERROR_RATE, 1);
throw ex;
}
}
super.run();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the current code state, yes, it will take care of it. But, it also has capability to create bigger problems in future. By not handling in the override class and relying that the logic is present in super class, in the future if someone changed the override class and remove the super call, it will expose this problem. Since the current logic is very much dependent on how the code is written and it can change in future, it is critical to handle the scenario for easier debugging in future.
There are two ways to look at this:
a. TopicManager.onPartitionsRevoked will still be called which can add additional processing time based on what the implementation of that interface is. If it is decided that the subsequent calls should not be made, then additional checks are required in main class as well.
b. If it is not critical and we want to let the future users know about it, it is better to add a comment in KafkaMirrorMakerConnectorTask to not remove super.onPartitionsRevoked call or refactor the code to take care of the new kafka client scenario.
I am very sure this knowledge will get lost in the future if there are not enough hints in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_topicManager.OnPartitionsRevoked is pretty much a noOp today. May be we could revisit this overridden code onPartitionsRevoked in KafkaMirrorMakerConnectorTask later if needed completly.
For now, we can add comments to explicitly state the changes in upgraded kafka even in the overridden class as mentioned in the base class for clarity.
I will add this into a separate PR following this. Thanks!
* Kafka upgrade * Kafka upgrade * Removing temp version * Addressing comments: removing flag update after close to prevent threads misbehavior * Removing dependency on li-apache-kafka-clients
The interface ConsumerRebalanceListener has changed in the 2.4 release, a new method is added onPartitionLost(), if this is not provided the default behavior is to call onPartitionRevoked(). basically on consumer close, its calling revoked partitions causing testValidateFlushlessModeTaskDiesOnRewindFailure to fail. This was not the behavior prior to this version.
Add fix to introduce a flag to hold the failure state and use it to no commit the changes on onPartitionRevoked call.
The Kafka team plans to deprecate open source li-apache-kafka-clients (http://github.com/linkedin/li-apache-kafka-clients) to streamline the client stack. Open source Brooklin currently depends on li-apache-kafka-clients and will need to directly depend on the LinkedIn version of open source Kafka clients (http://github.com/linkedin/kafka) instead before its full deprecation.
-->Auditor, and LargeMessageSegment support is deprecated.