diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 3be5de9b4..1bd9ca076 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -14,6 +14,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.2.5 + +=== Fixes + +* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469) + == 0.5.2.4 === Improvements diff --git a/README.adoc b/README.adoc index ee5df004d..e4efd7a9f 100644 --- a/README.adoc +++ b/README.adoc @@ -1311,6 +1311,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.2.5 + +=== Fixes + +* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469) + == 0.5.2.4 === Improvements diff --git a/RELEASE.adoc b/RELEASE.adoc index ad63e9731..65994e35d 100644 --- a/RELEASE.adoc +++ b/RELEASE.adoc @@ -5,9 +5,11 @@ `release:prepare -DautoVersionSubmodules=true -DpushChanges=false -Darguments=-DskipTests -Pci` -- Push the plugin's commits one at a time (otherwise need to request Jenkins to build the release tag - so this is a minor shortcut) +- Push the master branch with release and tag +- Trigger master builder to build the tag (this is needed to trigger the deployment flow) - Wait for Jenkins to finish running the build (~15 minutes) -- Wait for Sonatype to publish from it's staging area (~15 minutes) +- Wait for Sonatype to publish from it's staging area (~15 minutes) https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link] +- Verify the release is available on Maven Central https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link] - Create the release on GH from the tag - Paste in the details from the changelog, save, share as discussion -- Announce on slack(s), mailing list, \ No newline at end of file +- Announce on slack (community #clients and internal channels), mailing list, twitter \ No newline at end of file diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 8f3b2bbcb..f94af66db 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -37,6 +37,7 @@ import static io.confluent.csid.utils.BackportUtils.toSeconds; import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.internal.State.*; +import static java.lang.Boolean.TRUE; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static lombok.AccessLevel.PRIVATE; @@ -406,28 +407,34 @@ public void onPartitionsLost(Collection partitions) { * Nasty reflection to check if auto commit is disabled. *

* Other way would be to politely request the user also include their consumer properties when construction, but - * this is more reliable in a correctness sense, but britle in terms of coupling to internal implementation. + * this is more reliable in a correctness sense, but brittle in terms of coupling to internal implementation. * Consider requesting ability to inspect configuration at runtime. */ - @SneakyThrows private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consumer consumer) { - if (consumer instanceof KafkaConsumer) { - // Commons lang FieldUtils#readField - avoid needing commons lang - Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); //NoSuchFieldException - coordinatorField.setAccessible(true); - ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException - - if (coordinator == null) - throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?"); - - Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled"); - autoCommitEnabledField.setAccessible(true); - Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator); - - if (isAutoCommitEnabled) - throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library."); - } else { - // noop - probably MockConsumer being used in testing - which doesn't do auto commits + try { + if (consumer instanceof KafkaConsumer) { + // Could use Commons Lang FieldUtils#readField - but, avoid needing commons lang + Field coordinatorField = KafkaConsumer.class.getDeclaredField("coordinator"); + coordinatorField.setAccessible(true); + ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException + + if (coordinator == null) + throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?"); + + Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled"); + autoCommitEnabledField.setAccessible(true); + Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator); + + if (TRUE.equals(isAutoCommitEnabled)) + throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library."); + } else if (consumer instanceof MockConsumer) { + log.debug("Detected MockConsumer class which doesn't do auto commits"); + } else { + // Probably Mockito + log.error("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName()); + } + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalStateException("Cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName(), e); } } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java new file mode 100644 index 000000000..4dee90d70 --- /dev/null +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java @@ -0,0 +1,47 @@ +package io.confluent.parallelconsumer.integrationTests; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +/** + * @author Antony Stubbs + */ +class CustomConsumersTest extends BrokerIntegrationTest { + + /** + * Tests that extended consumer can be used with a custom consumer with PC. + *

+ * Test for issue #195 - https://github.com/confluentinc/parallel-consumer/issues/195 + * + * @see io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled + */ + @Test + void extendedConsumer() { // NOSONAR + Properties properties = getKcu().setupConsumerProps(this.getClass().getSimpleName()); + CustomConsumer client = new CustomConsumer<>(properties); + + ParallelConsumerOptions options = ParallelConsumerOptions.builder() + .consumer(client) + .build(); + + ParallelEoSStreamProcessor pc = new ParallelEoSStreamProcessor<>(options); + } + + static class CustomConsumer extends KafkaConsumer { + + String customField = "custom"; + + public CustomConsumer(Properties configs) { + super(configs); + } + + } +} \ No newline at end of file diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java index 247f01b6e..bcc71436d 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java @@ -109,7 +109,7 @@ private Properties setupProducerProps() { return producerProps; } - private Properties setupConsumerProps(String groupIdToUse) { + public Properties setupConsumerProps(String groupIdToUse) { var consumerProps = setupCommonProps(); // diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java index 35f56bcdf..ff7852f0f 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java @@ -823,7 +823,7 @@ void optionsGroupIdRequiredAndAutoCommitDisabled() { optionsBuilder.consumer(new KafkaConsumer<>(properties, deserializer, deserializer)); assertThat(catchThrowable(() -> parallelConsumer = initPollingAsyncConsumer(optionsBuilder.build()))) .as("Should error on auto commit enabled by default") - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(ParallelConsumerException.class) .hasMessageContainingAll("auto", "commit", "disabled"); // fail auto commit disabled @@ -831,7 +831,7 @@ void optionsGroupIdRequiredAndAutoCommitDisabled() { optionsBuilder.consumer(new KafkaConsumer<>(properties, deserializer, deserializer)); assertThat(catchThrowable(() -> parallelConsumer = initPollingAsyncConsumer(optionsBuilder.build()))) .as("Should error on auto commit enabled") - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(ParallelConsumerException.class) .hasMessageContainingAll("auto", "commit", "disabled"); // set missing auto commit