diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java index 682f65f65..618e0dd59 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java @@ -33,11 +33,11 @@ public class ParallelEoSStreamProcessor extends AbstractParallelEoSStreamP * * @see ParallelConsumerOptions */ - public ParallelEoSStreamProcessor(final ParallelConsumerOptions newOptions, PCModule module) { + public ParallelEoSStreamProcessor(ParallelConsumerOptions newOptions, PCModule module) { super(newOptions, module); } - public ParallelEoSStreamProcessor(final ParallelConsumerOptions newOptions) { + public ParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) { super(newOptions); } @@ -58,40 +58,43 @@ public void poll(Consumer> usersVoidConsumptionFunction) { @Override @SneakyThrows public void pollAndProduceMany(Function, List>> userFunction, - Consumer> callback) { - // todo refactor out the producer system to a sub class + Consumer> callback) { if (!getOptions().isProducerSupplied()) { throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options"); } // wrap user func to add produce function - Function, List>> wrappedUserFunc = context -> { - List> recordListToProduce = carefullyRun(userFunction, context.getPollContext()); + Function, List>> wrappedUserFunc = + context -> userFunctionWrap(userFunction, context); - if (recordListToProduce.isEmpty()) { - log.debug("No result returned from function to send."); - } - log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce); + supervisorLoop(wrappedUserFunc, callback); + } + + private List> userFunctionWrap(Function, List>> userFunction, + PollContextInternal context) { + List> recordListToProduce = carefullyRun(userFunction, context.getPollContext()); + + if (recordListToProduce.isEmpty()) { + log.debug("No result returned from function to send."); + } + log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce); - List> results = new ArrayList<>(); - log.trace("Producing {} messages in result...", recordListToProduce.size()); + List> results = new ArrayList<>(); + log.trace("Producing {} messages in result...", recordListToProduce.size()); - var futures = super.getProducerManager().get().produceMessages(recordListToProduce); - try { - for (Tuple, Future> future : futures) { - var recordMetadata = TimeUtils.time(() -> + var futures = super.getProducerManager().get().produceMessages(recordListToProduce); + try { + for (Tuple, Future> future : futures) { + var recordMetadata = TimeUtils.time(() -> future.getRight().get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS)); - var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata); - results.add(result); - } - } catch (Exception e) { - throw new InternalRuntimeError("Error while waiting for produce results", e); + var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata); + results.add(result); } + } catch (Exception e) { + throw new InternalRuntimeError("Error while waiting for produce results", e); + } - return results; - }; - - supervisorLoop(wrappedUserFunc, callback); + return results; } @Override @@ -114,8 +117,8 @@ public void pollAndProduce(Function, ProducerRecord> use @Override @SneakyThrows - public void pollAndProduce(Function, ProducerRecord> userFunction, - Consumer> callback) { + public void pollAndProduce(Function, ProducerRecord> userFunction, + Consumer> callback) { pollAndProduceMany(consumerRecord -> UniLists.of(userFunction.apply(consumerRecord)), callback); } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index d969df385..0355cc421 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -1184,7 +1184,7 @@ public void registerWork(EpochAndRecordsMap polledRecords) { * @see #blockableControlThread */ public void notifySomethingToDo() { - boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionInProgress).orElse(false); + boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false); if (noTransactionInProgress) { log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!"); interruptControlThread(); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java index dee06f242..5961ee073 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/PCModule.java @@ -12,8 +12,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; -import java.util.function.Supplier; - /** * Minimum dependency injection system, modled on how Dagger works. *

@@ -36,34 +34,46 @@ public ParallelConsumerOptions options() { return optionsInstance; } - private ProducerManager kvProducerManager; + private ProducerWrapper producerWrapper; + + protected ProducerWrapper producerWrap() { + if (this.producerWrapper == null) { + this.producerWrapper = new ProducerWrapper<>(options()); + } + return producerWrapper; + } + + private ProducerManager producerManager; - //Provides protected ProducerManager producerManager() { - if (kvProducerManager == null) { - this.kvProducerManager = new ProducerManager(producer(), consumerManager(), workManager(), options()); + if (producerManager == null) { + this.producerManager = new ProducerManager<>(producerWrap(), consumerManager(), workManager(), options()); } - return kvProducerManager; + return producerManager; } - private Producer producer() { + public Producer producer() { return optionsInstance.getProducer(); } - private ConsumerManager consumerManager; + public Consumer consumer() { + return optionsInstance.getConsumer(); + } + + private ConsumerManager consumerManager; protected ConsumerManager consumerManager() { if (consumerManager == null) { - consumerManager = new ConsumerManager(optionsInstance.getConsumer()); + consumerManager = new ConsumerManager<>(optionsInstance.getConsumer()); } return consumerManager; } - private WorkManager workManager; + private WorkManager workManager; public WorkManager workManager() { if (workManager == null) { - workManager = new WorkManager(this, dynamicExtraLoadFactor(), TimeUtils.getClock()); + workManager = new WorkManager<>(this, dynamicExtraLoadFactor(), TimeUtils.getClock()); } return workManager; } @@ -81,21 +91,13 @@ protected DynamicLoadFactor dynamicExtraLoadFactor() { return dynamicLoadFactor; } - private BrokerPollSystem brokerPollSystem; + private BrokerPollSystem brokerPollSystem; protected BrokerPollSystem brokerPoller(AbstractParallelEoSStreamProcessor pc) { if (brokerPollSystem == null) { -// final ParallelEoSStreamProcessor pc = pc(); brokerPollSystem = new BrokerPollSystem<>(consumerManager(), workManager(), pc, options()); } return brokerPollSystem; } - public Supplier> pcSupplier() { - return this::pc; - } - - public Consumer consumer() { - return optionsInstance.getConsumer(); - } } \ No newline at end of file diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java index 93f49ea5e..e86f33a7e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerManager.java @@ -8,17 +8,14 @@ import io.confluent.parallelconsumer.ParallelConsumerOptions; import io.confluent.parallelconsumer.ParallelStreamProcessor; import io.confluent.parallelconsumer.state.WorkManager; -import lombok.SneakyThrows; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.*; -import org.apache.kafka.clients.producer.internals.TransactionManager; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.time.Duration; import java.util.ArrayList; import java.util.ConcurrentModificationException; @@ -29,45 +26,41 @@ import static io.confluent.csid.utils.StringUtils.msg; +/** + * Sub system for interacting with the Producer and managing transactions (and thus offset committing through the + * Producer). + */ @Slf4j +@ToString(onlyExplicitlyIncluded = true) public class ProducerManager extends AbstractOffsetCommitter implements OffsetCommitter { - protected final Producer producer; - - private final ParallelConsumerOptions options; - - private final boolean producerIsConfiguredForTransactions; + protected final ProducerWrapper producer; + private final ParallelConsumerOptions options; /** - * The {@link KafkaProducer) isn't actually completely thread safe, at least when using it transactionally. We must + * The {@link KafkaProducer} isn't actually completely thread safe, at least when using it transactionally. We must * be careful not to send messages to the producer, while we are committing a transaction - "Cannot call send in * state COMMITTING_TRANSACTION". */ private ReentrantReadWriteLock producerTransactionLock; - // nasty reflection - private Field txManagerField; - private Method txManagerMethodIsCompleting; - private Method txManagerMethodIsReady; - - public ProducerManager(final Producer newProducer, final ConsumerManager newConsumer, final WorkManager wm, ParallelConsumerOptions options) { + public ProducerManager(ProducerWrapper newProducer, + ConsumerManager newConsumer, + WorkManager wm, + ParallelConsumerOptions options) { super(newConsumer, wm); this.producer = newProducer; this.options = options; - producerIsConfiguredForTransactions = setupReflection(); - initProducer(); } private void initProducer() { producerTransactionLock = new ReentrantReadWriteLock(true); - // String transactionIdProp = options.getProducerConfig().getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG); - // boolean txIdSupplied = isBlank(transactionIdProp); if (options.isUsingTransactionalProducer()) { - if (!producerIsConfiguredForTransactions) { + if (!producer.isConfiguredForTransactions()) { throw new IllegalArgumentException("Using transactional option, yet Producer doesn't have a transaction ID - Producer needs a transaction id"); } try { @@ -79,41 +72,14 @@ private void initProducer() { throw e; } } else { - if (producerIsConfiguredForTransactions) { - throw new IllegalArgumentException("Using non-transactional producer option, but Producer has a transaction ID - " + - "the Producer must not have a transaction ID for this option. This is because having such an ID forces the " + - "Producer into transactional mode - i.e. you cannot use it without using transactions."); + if (producer.isConfiguredForTransactions()) { + throw new IllegalArgumentException("Using non-transactional producer option, but Producer has a transaction ID - " + + "the Producer must not have a transaction ID for this option. This is because having such an ID forces the " + + "Producer into transactional mode - i.e. you cannot use it without using transactions."); } } } - /** - * Nasty reflection but better than relying on user supplying their config - * - * @see AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled - */ - @SneakyThrows - private boolean getProducerIsTransactional() { - if (producer instanceof MockProducer) { - // can act as both, delegate to user selection - return options.isUsingTransactionalProducer(); - } else { - TransactionManager transactionManager = getTransactionManager(); - if (transactionManager == null) { - return false; - } else { - return transactionManager.isTransactional(); - } - } - } - - @SneakyThrows - private TransactionManager getTransactionManager() { - if (txManagerField == null) return null; - TransactionManager transactionManager = (TransactionManager) txManagerField.get(producer); - return transactionManager; - } - /** * Produce a message back to the broker. *

@@ -179,10 +145,10 @@ protected void commitOffsets(final Map offset if (retryCount > arbitrarilyChosenLimitForArbitraryErrorSituation) { String msg = msg("Retired too many times ({} > limit of {}), giving up. See error above.", retryCount, arbitrarilyChosenLimitForArbitraryErrorSituation); log.error(msg, lastErrorSavedForRethrow); - throw new RuntimeException(msg, lastErrorSavedForRethrow); + throw new InternalRuntimeError(msg, lastErrorSavedForRethrow); } try { - if (producer instanceof MockProducer) { + if (producer.isMockProducer()) { // see bug https://issues.apache.org/jira/browse/KAFKA-10382 // KAFKA-10382 - MockProducer is not ThreadSafe, ideally it should be as the implementation it mocks is synchronized (producer) { @@ -194,15 +160,16 @@ protected void commitOffsets(final Map offset // producer commit lock should already be acquired at this point, before work was retrieved to commit, // so that more messages don't sneak into this tx block - the consumer records of which won't yet be // in this offset collection - ensureLockHeld(); + ensureCommitLockHeld(); + // TODO talk about alternatives to this brute force approach for retrying committing transactions boolean retrying = retryCount > 0; if (retrying) { - if (isTransactionCompleting()) { + if (producer.isTransactionCompleting()) { // try wait again - producer.commitTransaction(); + commitTransaction(); } - if (isTransactionReady()) { + if (producer.isTransactionReady()) { // tx has completed since we last tried, start a new one producer.beginTransaction(); } @@ -213,7 +180,7 @@ protected void commitOffsets(final Map offset } } else { // happy path - producer.commitTransaction(); + commitTransaction(); producer.beginTransaction(); } } @@ -230,69 +197,35 @@ protected void commitOffsets(final Map offset } } - /** - * @return boolean which shows if we are setup for transactions or now - */ - @SneakyThrows - private boolean setupReflection() { - if (producer instanceof KafkaProducer) { - txManagerField = producer.getClass().getDeclaredField("transactionManager"); - txManagerField.setAccessible(true); - - boolean producerIsConfiguredForTransactions = getProducerIsTransactional(); - if (producerIsConfiguredForTransactions) { - TransactionManager transactionManager = getTransactionManager(); - txManagerMethodIsCompleting = transactionManager.getClass().getDeclaredMethod("isCompleting"); - txManagerMethodIsCompleting.setAccessible(true); - - txManagerMethodIsReady = transactionManager.getClass().getDeclaredMethod("isReady"); - txManagerMethodIsReady.setAccessible(true); - } - return producerIsConfiguredForTransactions; - } else if (producer instanceof MockProducer) { - // can act as both, delegate to user selection - return options.isUsingTransactionalProducer(); - } else { - // unknown - return false; - } - } - - /** - * TODO talk about alternatives to this brute force approach for retrying committing transactions - */ - @SneakyThrows - private boolean isTransactionCompleting() { - if (producer instanceof MockProducer) return false; - return (boolean) txManagerMethodIsCompleting.invoke(getTransactionManager()); - } - - /** - * TODO talk about alternatives to this brute force approach for retrying committing transactions - */ - @SneakyThrows - private boolean isTransactionReady() { - if (producer instanceof MockProducer) return true; - return (boolean) txManagerMethodIsReady.invoke(getTransactionManager()); + private void commitTransaction() { + producer.commitTransaction(); } /** * Assumes the system is drained at this point, or draining is not desired. */ - public void close(final Duration timeout) { + public void close(Duration timeout) { log.debug("Closing producer, assuming no more in flight..."); - if (options.isUsingTransactionalProducer() && !isTransactionReady()) { + if (options.isUsingTransactionalProducer() && !producer.isTransactionReady()) { acquireCommitLock(); try { // close started after tx began, but before work was done, otherwise a tx wouldn't have been started - producer.abortTransaction(); + abortTransaction(); } finally { releaseCommitLock(); } } + closeProducer(timeout); + } + + private void closeProducer(Duration timeout) { producer.close(timeout); } + private void abortTransaction() { + producer.abortTransaction(); + } + private void acquireCommitLock() { if (producerTransactionLock.getWriteHoldCount() > 0) throw new ConcurrentModificationException("Lock already held"); @@ -304,19 +237,23 @@ private void acquireCommitLock() { } private void releaseCommitLock() { - log.trace("Release commit lock"); + log.debug("Releasing commit lock..."); ReentrantReadWriteLock.WriteLock writeLock = producerTransactionLock.writeLock(); if (!producerTransactionLock.isWriteLockedByCurrentThread()) throw new IllegalStateException("Not held be me"); writeLock.unlock(); + log.debug("Commit lock released."); } - private void ensureLockHeld() { + private void ensureCommitLockHeld() { if (!producerTransactionLock.isWriteLockedByCurrentThread()) throw new IllegalStateException("Expected commit lock to be held"); } - public boolean isTransactionInProgress() { + /** + * @return true if the commit lock has been acquired by any thread. + */ + public boolean isTransactionCommittingInProgress() { return producerTransactionLock.isWriteLocked(); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerWrapper.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerWrapper.java new file mode 100644 index 000000000..86a9ac9cd --- /dev/null +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ProducerWrapper.java @@ -0,0 +1,154 @@ +package io.confluent.parallelconsumer.internal; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.experimental.Delegate; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.internals.TransactionManager; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Map; + +/** + * Our extension of the standard Producer to mostly add some introspection functions and state tracking. + * + * @author Antony Stubbs + */ +@RequiredArgsConstructor +public class ProducerWrapper implements Producer { + + private final ParallelConsumerOptions options; + + /** + * Cached discovery of whether the underlying Producer has been set up for transactions or not. + */ + private final boolean producerIsConfiguredForTransactions; + + // nasty reflection + private Field txManagerField; + private Method txManagerMethodIsCompleting; + private Method txManagerMethodIsReady; + + @Delegate(excludes = Excludes.class) + private final Producer producer; + + public ProducerWrapper(ParallelConsumerOptions options) { + this.options = options; + producer = options.getProducer(); + this.producerIsConfiguredForTransactions = discoverIfProducerIsConfiguredForTransactions(); + } + + public boolean isMockProducer() { + return producer instanceof MockProducer; + } + + public boolean isConfiguredForTransactions() { + return this.producerIsConfiguredForTransactions; + } + + /** + * Type erasure issue fix + */ + interface Excludes { + void sendOffsetsToTransaction(Map offsets, + String consumerGroupId) throws ProducerFencedException; + + void sendOffsetsToTransaction(Map offsets, + ConsumerGroupMetadata groupMetadata) throws ProducerFencedException; + } + + /** + * @deprecated use {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} + */ + @Deprecated + public void sendOffsetsToTransaction(Map offsets, + String consumerGroupId) throws ProducerFencedException { + sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId)); + } + + public void sendOffsetsToTransaction(Map offsets, + ConsumerGroupMetadata groupMetadata) throws ProducerFencedException { + producer.sendOffsetsToTransaction(offsets, groupMetadata); + } + + + /** + * @return boolean which shows if we are set up for transactions or not + */ + @SneakyThrows + private boolean discoverIfProducerIsConfiguredForTransactions() { + if (producer instanceof KafkaProducer) { + txManagerField = producer.getClass().getDeclaredField("transactionManager"); + txManagerField.setAccessible(true); + + boolean producerIsConfiguredForTransactions = getProducerIsTransactional(); + if (producerIsConfiguredForTransactions) { + TransactionManager transactionManager = getTransactionManager(); + txManagerMethodIsCompleting = transactionManager.getClass().getDeclaredMethod("isCompleting"); + txManagerMethodIsCompleting.setAccessible(true); + + txManagerMethodIsReady = transactionManager.getClass().getDeclaredMethod("isReady"); + txManagerMethodIsReady.setAccessible(true); + } + return producerIsConfiguredForTransactions; + } else if (producer instanceof MockProducer) { + // can act as both, delegate to user selection + return options.isUsingTransactionalProducer(); + } else { + // unknown + return false; + } + } + + /** + * Nasty reflection but better than relying on user supplying a copy of their config, maybe + * + * @see AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled + */ + @SneakyThrows + private boolean getProducerIsTransactional() { + if (producer instanceof MockProducer) { + // can act as both, delegate to user selection + return options.isUsingTransactionalProducer(); + } else { + TransactionManager transactionManager = getTransactionManager(); + if (transactionManager == null) { + return false; + } else { + return transactionManager.isTransactional(); + } + } + } + + @SneakyThrows + private TransactionManager getTransactionManager() { + if (txManagerField == null) return null; + TransactionManager transactionManager = (TransactionManager) txManagerField.get(producer); + return transactionManager; + } + + @SneakyThrows + protected boolean isTransactionCompleting() { + if (producer instanceof MockProducer) return false; + return (boolean) txManagerMethodIsCompleting.invoke(getTransactionManager()); + } + + @SneakyThrows + protected boolean isTransactionReady() { + if (producer instanceof MockProducer) return true; + return (boolean) txManagerMethodIsReady.invoke(getTransactionManager()); + } + +} diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionAndCommitModeTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionAndCommitModeTest.java index 597eecd5a..c2bc5257c 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionAndCommitModeTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionAndCommitModeTest.java @@ -35,8 +35,6 @@ import java.util.*; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static io.confluent.csid.utils.StringUtils.msg; import static io.confluent.parallelconsumer.AbstractParallelEoSStreamProcessorTestBase.defaultTimeout; @@ -44,7 +42,8 @@ import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER; import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.*; import static java.time.Duration.ofSeconds; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.waitAtMost; import static pl.tlinkowski.unij.api.UniLists.of; @@ -271,22 +270,4 @@ private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order, bar.close(); } - @Test - void customRepresentationFail() { - List one = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); - List two = IntStream.range(999, 2000).boxed().collect(Collectors.toList()); - assertThatThrownBy(() -> assertThat(one).withRepresentation(new TrimListRepresentation()).containsAll(two)) - .hasMessageContaining("trimmed"); - } - - @Test - void customRepresentationPass() { - Assertions.useRepresentation(new TrimListRepresentation()); - List one = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); - List two = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); - SoftAssertions all = new SoftAssertions(); - all.assertThat(one).containsAll(two); - all.assertAll(); - } - } diff --git a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java index 54694f973..bcfb0abac 100644 --- a/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java +++ b/parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/TransactionMarkersTest.java @@ -36,8 +36,7 @@ * @see OffsetSimultaneousEncoder#OffsetSimultaneousEncoder */ @Slf4j -public -class TransactionMarkersTest extends BrokerIntegrationTest { +public class TransactionMarkersTest extends BrokerIntegrationTest { /** * Block all records beyond the second record diff --git a/parallel-consumer-core/src/test/java/io/confluent/csid/utils/TrimListRepresentationTest.java b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/TrimListRepresentationTest.java new file mode 100644 index 000000000..db302b301 --- /dev/null +++ b/parallel-consumer-core/src/test/java/io/confluent/csid/utils/TrimListRepresentationTest.java @@ -0,0 +1,42 @@ +package io.confluent.csid.utils; + +/*- + * Copyright (C) 2020-2022 Confluent, Inc. + */ + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * @author Antony Stubbs + * @see TrimListRepresentation + */ +class TrimListRepresentationTest { + + @Test + void customRepresentationFail() { + List one = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); + List two = IntStream.range(999, 2000).boxed().collect(Collectors.toList()); + assertThatThrownBy(() -> assertThat(one).withRepresentation(new TrimListRepresentation()).containsAll(two)) + .hasMessageContaining("trimmed"); + } + + @Test + void customRepresentationPass() { + Assertions.useRepresentation(new TrimListRepresentation()); + List one = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); + List two = IntStream.range(0, 1000).boxed().collect(Collectors.toList()); + SoftAssertions all = new SoftAssertions(); + all.assertThat(one).containsAll(two); + all.assertAll(); + } + +} \ No newline at end of file