From 585b5842fcbd3af427b1c471ef12b9a2b0c7065a Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 3 Nov 2022 11:54:12 +0000 Subject: [PATCH 1/7] docs: Release instructions tweek --- RELEASE.adoc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/RELEASE.adoc b/RELEASE.adoc index ad63e9731..e9c10aeae 100644 --- a/RELEASE.adoc +++ b/RELEASE.adoc @@ -7,7 +7,8 @@ - Push the plugin's commits one at a time (otherwise need to request Jenkins to build the release tag - so this is a minor shortcut) - Wait for Jenkins to finish running the build (~15 minutes) -- Wait for Sonatype to publish from it's staging area (~15 minutes) +- Wait for Sonatype to publish from it's staging area (~15 minutes) https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link] +- Verify the release is available on Maven Central https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link] - Create the release on GH from the tag - Paste in the details from the changelog, save, share as discussion -- Announce on slack(s), mailing list, \ No newline at end of file +- Announce on slack (community #clients and internal channels), mailing list, twitter \ No newline at end of file From 95ee9b106d489f9d454dc3ea379e9e6c32a2229c Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 3 Nov 2022 12:50:08 +0000 Subject: [PATCH 2/7] fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer Corrects use of reflection for field access, by using the KafkaConsumer class directly, instead of trying to find it from the provided consumer. --- .../AbstractParallelEoSStreamProcessor.java | 42 +++++++++-------- .../integrationTests/CustomConsumersTest.java | 47 +++++++++++++++++++ .../utils/KafkaClientUtils.java | 2 +- 3 files changed, 71 insertions(+), 20 deletions(-) create mode 100644 parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 8f3b2bbcb..c39664454 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -406,28 +406,32 @@ public void onPartitionsLost(Collection partitions) { * Nasty reflection to check if auto commit is disabled. *

* Other way would be to politely request the user also include their consumer properties when construction, but - * this is more reliable in a correctness sense, but britle in terms of coupling to internal implementation. + * this is more reliable in a correctness sense, but brittle in terms of coupling to internal implementation. * Consider requesting ability to inspect configuration at runtime. */ - @SneakyThrows private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consumer consumer) { - if (consumer instanceof KafkaConsumer) { - // Commons lang FieldUtils#readField - avoid needing commons lang - Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); //NoSuchFieldException - coordinatorField.setAccessible(true); - ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException - - if (coordinator == null) - throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?"); - - Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled"); - autoCommitEnabledField.setAccessible(true); - Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator); - - if (isAutoCommitEnabled) - throw new IllegalArgumentException("Consumer auto commit must be disabled, as commits are handled by the library."); - } else { - // noop - probably MockConsumer being used in testing - which doesn't do auto commits + try { + if (consumer instanceof KafkaConsumer) { + Field coordinatorField = KafkaConsumer.class.getDeclaredField("coordinator"); + coordinatorField.setAccessible(true); + ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException + + if (coordinator == null) + throw new IllegalStateException("Coordinator for Consumer is null - missing GroupId? Reflection broken?"); + + Field autoCommitEnabledField = coordinator.getClass().getDeclaredField("autoCommitEnabled"); + autoCommitEnabledField.setAccessible(true); + Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator); + + if (isAutoCommitEnabled) + throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library."); + } else if (consumer instanceof MockConsumer) { + log.debug("Detected MockConsumer class which doesn't do auto commits"); + } else { + throw new UnsupportedOperationException("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName()); + } + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalStateException("Cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName(), e); } } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java new file mode 100644 index 000000000..4dee90d70 --- /dev/null +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/CustomConsumersTest.java @@ -0,0 +1,47 @@ +package io.confluent.parallelconsumer.integrationTests; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelEoSStreamProcessor; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +/** + * @author Antony Stubbs + */ +class CustomConsumersTest extends BrokerIntegrationTest { + + /** + * Tests that extended consumer can be used with a custom consumer with PC. + *

+ * Test for issue #195 - https://github.com/confluentinc/parallel-consumer/issues/195 + * + * @see io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled + */ + @Test + void extendedConsumer() { // NOSONAR + Properties properties = getKcu().setupConsumerProps(this.getClass().getSimpleName()); + CustomConsumer client = new CustomConsumer<>(properties); + + ParallelConsumerOptions options = ParallelConsumerOptions.builder() + .consumer(client) + .build(); + + ParallelEoSStreamProcessor pc = new ParallelEoSStreamProcessor<>(options); + } + + static class CustomConsumer extends KafkaConsumer { + + String customField = "custom"; + + public CustomConsumer(Properties configs) { + super(configs); + } + + } +} \ No newline at end of file diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java index 247f01b6e..bcc71436d 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/utils/KafkaClientUtils.java @@ -109,7 +109,7 @@ private Properties setupProducerProps() { return producerProps; } - private Properties setupConsumerProps(String groupIdToUse) { + public Properties setupConsumerProps(String groupIdToUse) { var consumerProps = setupCommonProps(); // From 8d251ff98b1a6faa3cbab22985e4a59493e186b0 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 3 Nov 2022 13:01:00 +0000 Subject: [PATCH 3/7] review --- CHANGELOG.adoc | 6 ++++++ README.adoc | 6 ++++++ .../internal/AbstractParallelEoSStreamProcessor.java | 1 + 3 files changed, 13 insertions(+) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index 3be5de9b4..1bd9ca076 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -14,6 +14,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.2.5 + +=== Fixes + +* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469) + == 0.5.2.4 === Improvements diff --git a/README.adoc b/README.adoc index ee5df004d..e4efd7a9f 100644 --- a/README.adoc +++ b/README.adoc @@ -1311,6 +1311,12 @@ ifndef::github_name[] toc::[] endif::[] +== 0.5.2.5 + +=== Fixes + +* fixes: #195 NoSuchFieldException when using consumer inherited from KafkaConsumer (#469) + == 0.5.2.4 === Improvements diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index c39664454..c44552f3a 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -412,6 +412,7 @@ public void onPartitionsLost(Collection partitions) { private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consumer consumer) { try { if (consumer instanceof KafkaConsumer) { + // Could use Commons Lang FieldUtils#readField - but, avoid needing commons lang Field coordinatorField = KafkaConsumer.class.getDeclaredField("coordinator"); coordinatorField.setAccessible(true); ConsumerCoordinator coordinator = (ConsumerCoordinator) coordinatorField.get(consumer); //IllegalAccessException From b5354abc8c58f242c57a317df481c46f07a34193 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 3 Nov 2022 13:04:30 +0000 Subject: [PATCH 4/7] review --- RELEASE.adoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/RELEASE.adoc b/RELEASE.adoc index e9c10aeae..65994e35d 100644 --- a/RELEASE.adoc +++ b/RELEASE.adoc @@ -5,7 +5,8 @@ `release:prepare -DautoVersionSubmodules=true -DpushChanges=false -Darguments=-DskipTests -Pci` -- Push the plugin's commits one at a time (otherwise need to request Jenkins to build the release tag - so this is a minor shortcut) +- Push the master branch with release and tag +- Trigger master builder to build the tag (this is needed to trigger the deployment flow) - Wait for Jenkins to finish running the build (~15 minutes) - Wait for Sonatype to publish from it's staging area (~15 minutes) https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link] - Verify the release is available on Maven Central https://repo1.maven.org/maven2/io/confluent/parallelconsumer/parallel-consumer-parent/[repo1 link] From beb5213b44a796a57fe077b5862e198dcd10f8e7 Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 3 Nov 2022 13:41:29 +0000 Subject: [PATCH 5/7] review --- .../internal/AbstractParallelEoSStreamProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index c44552f3a..93746075f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -426,7 +426,7 @@ private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consume if (isAutoCommitEnabled) throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library."); - } else if (consumer instanceof MockConsumer) { + } else if (consumer instanceof MockConsumer) { log.debug("Detected MockConsumer class which doesn't do auto commits"); } else { throw new UnsupportedOperationException("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName()); From bd9aad0df9167a3c414138881587831e51ace51b Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Mon, 21 Nov 2022 13:40:10 +0000 Subject: [PATCH 6/7] review --- .../parallelconsumer/ParallelEoSStreamProcessorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java index 35f56bcdf..ff7852f0f 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessorTest.java @@ -823,7 +823,7 @@ void optionsGroupIdRequiredAndAutoCommitDisabled() { optionsBuilder.consumer(new KafkaConsumer<>(properties, deserializer, deserializer)); assertThat(catchThrowable(() -> parallelConsumer = initPollingAsyncConsumer(optionsBuilder.build()))) .as("Should error on auto commit enabled by default") - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(ParallelConsumerException.class) .hasMessageContainingAll("auto", "commit", "disabled"); // fail auto commit disabled @@ -831,7 +831,7 @@ void optionsGroupIdRequiredAndAutoCommitDisabled() { optionsBuilder.consumer(new KafkaConsumer<>(properties, deserializer, deserializer)); assertThat(catchThrowable(() -> parallelConsumer = initPollingAsyncConsumer(optionsBuilder.build()))) .as("Should error on auto commit enabled") - .isInstanceOf(IllegalArgumentException.class) + .isInstanceOf(ParallelConsumerException.class) .hasMessageContainingAll("auto", "commit", "disabled"); // set missing auto commit From 90848befde3de12d050b65fd6ee92327185ea85e Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Mon, 21 Nov 2022 14:17:35 +0000 Subject: [PATCH 7/7] review --- .../internal/AbstractParallelEoSStreamProcessor.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 93746075f..f94af66db 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -37,6 +37,7 @@ import static io.confluent.csid.utils.BackportUtils.toSeconds; import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.internal.State.*; +import static java.lang.Boolean.TRUE; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static lombok.AccessLevel.PRIVATE; @@ -424,12 +425,13 @@ private void checkAutoCommitIsDisabled(org.apache.kafka.clients.consumer.Consume autoCommitEnabledField.setAccessible(true); Boolean isAutoCommitEnabled = (Boolean) autoCommitEnabledField.get(coordinator); - if (isAutoCommitEnabled) + if (TRUE.equals(isAutoCommitEnabled)) throw new ParallelConsumerException("Consumer auto commit must be disabled, as commits are handled by the library."); } else if (consumer instanceof MockConsumer) { log.debug("Detected MockConsumer class which doesn't do auto commits"); } else { - throw new UnsupportedOperationException("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName()); + // Probably Mockito + log.error("Consumer is neither a KafkaConsumer nor a MockConsumer - cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName()); } } catch (NoSuchFieldException | IllegalAccessException e) { throw new IllegalStateException("Cannot check auto commit is disabled for consumer type: " + consumer.getClass().getName(), e);