Skip to content

Commit

Permalink
new parent: move
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Jul 28, 2021
1 parent 6b63a5c commit 6e5f905
Show file tree
Hide file tree
Showing 19 changed files with 54 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -18,7 +19,7 @@ static <KK, VV> JStreamParallelStreamProcessor<KK, VV> createJStreamEosStreamPro
}

/**
* Like {@link ParentParallelEoSStreamProcessor#pollAndProduceMany} but instead of callbacks, streams the results
* Like {@link AbstractParallelEoSStreamProcessor#pollAndProduceMany} but instead of callbacks, streams the results
* instead, after the produce result is ack'd by Kafka.
*
* @return a stream of results of applying the function to the polled records
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.DrainingCloseable;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand All @@ -13,15 +14,16 @@
import java.util.regex.Pattern;

// tag::javadoc[]

/**
* Asynchronous / concurrent message consumer for Kafka.
* <p>
* Currently there is no direct implementation, only the {@link ParallelStreamProcessor} version (see {@link
* ParentParallelEoSStreamProcessor}), but there may be in the future.
* AbstractParallelEoSStreamProcessor}), but there may be in the future.
*
* @param <K> key consume / produce key type
* @param <V> value consume / produce value type
* @see ParentParallelEoSStreamProcessor
* @see AbstractParallelEoSStreamProcessor
* @see #poll(Consumer)
*/
// end::javadoc[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -19,7 +20,7 @@
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;

/**
* The options for the {@link ParentParallelEoSStreamProcessor} system.
* The options for the {@link AbstractParallelEoSStreamProcessor} system.
*
* @see #builder()
* @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -15,7 +16,7 @@
import static io.confluent.parallelconsumer.internal.UserFunctions.carefullyRun;

@Slf4j
public class ParallelEoSStreamProcessor<K, V> extends ParentParallelEoSStreamProcessor<K, V>
public class ParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamProcessor<K, V>
implements ParallelStreamProcessor<K, V> {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.confluent.parallelconsumer;
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.internal.*;
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.AccessLevel;
Expand Down Expand Up @@ -48,7 +49,7 @@
* @see ParallelConsumer
*/
@Slf4j
public abstract class ParentParallelEoSStreamProcessor<K, V> implements ParallelConsumer<K, V>, ConsumerRebalanceListener, Closeable {
public abstract class AbstractParallelEoSStreamProcessor<K, V> implements ParallelConsumer<K, V>, ConsumerRebalanceListener, Closeable {

public static final String MDC_INSTANCE_ID = "pcId";

Expand Down Expand Up @@ -183,7 +184,7 @@ public Exception getFailureCause() {
*
* @see ParallelConsumerOptions
*/
public ParentParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) {
public AbstractParallelEoSStreamProcessor(ParallelConsumerOptions newOptions) {
Objects.requireNonNull(newOptions, "Options must be supplied");

log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions);
Expand Down Expand Up @@ -1010,7 +1011,7 @@ public long workRemaining() {
* <p>
* Useful for testing and controlling loop progression.
*/
void addLoopEndCallBack(Runnable r) {
public void addLoopEndCallBack(Runnable r) {
this.controlLoopHooks.add(r);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -24,7 +23,7 @@
import java.util.concurrent.*;

import static io.confluent.csid.utils.BackportUtils.toSeconds;
import static io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor.MDC_INSTANCE_ID;
import static io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.MDC_INSTANCE_ID;
import static io.confluent.parallelconsumer.internal.State.closed;
import static io.confluent.parallelconsumer.internal.State.running;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -47,7 +46,7 @@ public class BrokerPollSystem<K, V> implements OffsetCommitter {
@Getter
private volatile boolean paused = false;

private final ParentParallelEoSStreamProcessor<K, V> pc;
private final AbstractParallelEoSStreamProcessor<K, V> pc;

private Optional<ConsumerOffsetCommitter<K, V>> committer = Optional.empty();

Expand All @@ -61,7 +60,7 @@ public class BrokerPollSystem<K, V> implements OffsetCommitter {

private final WorkManager<K, V> wm;

public BrokerPollSystem(ConsumerManager<K, V> consumerMgr, WorkManager<K, V> wm, ParentParallelEoSStreamProcessor<K, V> pc, final ParallelConsumerOptions options) {
public BrokerPollSystem(ConsumerManager<K, V> consumerMgr, WorkManager<K, V> wm, AbstractParallelEoSStreamProcessor<K, V> pc, final ParallelConsumerOptions options) {
this.wm = wm;
this.pc = pc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -151,7 +150,7 @@ private void commitAndWait() {

try {
log.debug("Waiting on a commit response");
Duration timeout = ParentParallelEoSStreamProcessor.DEFAULT_TIMEOUT;
Duration timeout = AbstractParallelEoSStreamProcessor.DEFAULT_TIMEOUT;
CommitResponse take = commitResponseQueue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); // blocks, drain until we find our response
if (take == null)
throw new InternalRuntimeError(msg("Timeout waiting for commit response {} to request {}", timeout, commitRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkContainer;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -16,7 +15,7 @@
* Overrides key aspects required in common for other threading engines like Vert.x and Reactor
*/
@Slf4j
public abstract class ExternalEngine<K, V> extends ParentParallelEoSStreamProcessor<K, V> {
public abstract class ExternalEngine<K, V> extends AbstractParallelEoSStreamProcessor<K, V> {

public ExternalEngine(final ParallelConsumerOptions newOptions) {
super(newOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.confluent.parallelconsumer.ParallelConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -91,7 +90,7 @@ private void initProducer() {
/**
* Nasty reflection but better than relying on user supplying their config
*
* @see ParentParallelEoSStreamProcessor#checkAutoCommitIsDisabled
* @see AbstractParallelEoSStreamProcessor#checkAutoCommitIsDisabled
*/
@SneakyThrows
private boolean getProducerIsTransactional() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.InternalRuntimeError;
import io.confluent.parallelconsumer.offsets.EncodingNotSupportedException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager;
Expand Down Expand Up @@ -105,9 +105,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
/**
* Clear offset map for revoked partitions
* <p>
* {@link ParentParallelEoSStreamProcessor#onPartitionsRevoked} handles committing off offsets upon revoke
* {@link AbstractParallelEoSStreamProcessor#onPartitionsRevoked} handles committing off offsets upon revoke
*
* @see ParentParallelEoSStreamProcessor#onPartitionsRevoked
* @see AbstractParallelEoSStreamProcessor#onPartitionsRevoked
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.confluent.csid.utils.WallClock;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.DynamicLoadFactor;
import io.confluent.parallelconsumer.internal.RateLimiter;
import lombok.Getter;
Expand Down Expand Up @@ -132,9 +132,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
/**
* Clear offset map for revoked partitions
* <p>
* {@link ParentParallelEoSStreamProcessor#onPartitionsRevoked} handles committing off offsets upon revoke
* {@link AbstractParallelEoSStreamProcessor#onPartitionsRevoked} handles committing off offsets upon revoke
*
* @see ParentParallelEoSStreamProcessor#onPartitionsRevoked
* @see AbstractParallelEoSStreamProcessor#onPartitionsRevoked
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -107,7 +107,7 @@ public synchronized void close(final long timeout, final TimeUnit unit) {
* assignments, use reflection to access the registered rebalance listener, call the listener, and only then close
* the consumer.
*
* @see ParentParallelEoSStreamProcessor#onPartitionsRevoked
* @see AbstractParallelEoSStreamProcessor#onPartitionsRevoked
*/
private void revokeAssignment() throws NoSuchFieldException, IllegalAccessException {
ConsumerRebalanceListener consumerRebalanceListener = getRebalanceListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.confluent.csid.utils.KafkaTestUtils;
import io.confluent.csid.utils.LongPollingMockConsumer;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.SneakyThrows;
Expand Down Expand Up @@ -61,19 +62,19 @@ public abstract class AbstractParallelEoSStreamProcessorTestBase {
public static final int DEFAULT_BROKER_POLL_FREQUENCY_MS = 100;

/**
* The commit interval for the main {@link ParentParallelEoSStreamProcessor} control thread. Actually the timeout
* The commit interval for the main {@link AbstractParallelEoSStreamProcessor} control thread. Actually the timeout
* that we poll the {@link LinkedBlockingQueue} for. A lower value will increase the frequency of control loop
* cycles, making our test waiting go faster.
*
* @see ParentParallelEoSStreamProcessor#workMailBox
* @see ParentParallelEoSStreamProcessor#processWorkCompleteMailBox
* @see AbstractParallelEoSStreamProcessor#workMailBox
* @see AbstractParallelEoSStreamProcessor#processWorkCompleteMailBox
*/
public static final int DEFAULT_COMMIT_INTERVAL_MAX_MS = 100;

protected LongPollingMockConsumer<String, String> consumerSpy;
protected MockProducer<String, String> producerSpy;

protected ParentParallelEoSStreamProcessor<String, String> parentParallelConsumer;
protected AbstractParallelEoSStreamProcessor<String, String> parentParallelConsumer;

public static int defaultTimeoutSeconds = 10;

Expand Down Expand Up @@ -175,7 +176,7 @@ protected void instantiateConsumerProducer() {
}

/**
* Need to make sure we only use {@link ParentParallelEoSStreamProcessor#subscribe} methods, and not do manual
* Need to make sure we only use {@link AbstractParallelEoSStreamProcessor#subscribe} methods, and not do manual
* assignment, otherwise rebalance listeners don't fire (because there are never rebalances).
*/
protected void subscribeParallelConsumerAndMockConsumerTo(String topic) {
Expand Down Expand Up @@ -208,14 +209,14 @@ protected void setupParallelConsumerInstance(ParallelConsumerOptions parallelCon
loopCountRef = attachLoopCounter(parentParallelConsumer);
}

protected abstract ParentParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions);
protected abstract AbstractParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions);

protected void sendSecondRecord(MockConsumer<String, String> consumer) {
secondRecord = ktu.makeRecord("key-0", "v1");
consumer.addRecord(secondRecord);
}

protected AtomicReference<Integer> attachLoopCounter(ParentParallelEoSStreamProcessor parallelConsumer) {
protected AtomicReference<Integer> attachLoopCounter(AbstractParallelEoSStreamProcessor parallelConsumer) {
final AtomicReference<Integer> currentLoop = new AtomicReference<>(0);
parentParallelConsumer.addLoopEndCallBack(() -> {
Integer currentNumber = currentLoop.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.confluent.csid.utils.LatchTestUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.*;
Expand Down Expand Up @@ -48,7 +49,7 @@
@Slf4j
public class ParallelEoSStreamProcessorTest extends ParallelEoSStreamProcessorTestBase {

static class MyAction implements Function<ConsumerRecord<String, String>, String> {
public static class MyAction implements Function<ConsumerRecord<String, String>, String> {

@Override
public String apply(ConsumerRecord<String, String> record) {
Expand Down Expand Up @@ -264,7 +265,7 @@ void offsetCommitsAreIsolatedPerPartition(CommitMode commitMode) {
Assumptions.assumeThat(parallelConsumer)
.as("Should only test on core PC - this test is very complicated to get to work with vert.x " +
"thread system, as the event and locking system needed is quite different")
.isExactlyInstanceOf(ParentParallelEoSStreamProcessor.class);
.isExactlyInstanceOf(AbstractParallelEoSStreamProcessor.class);

setupParallelConsumerInstance(getBaseOptions(commitMode).toBuilder()
.ordering(UNORDERED)
Expand Down Expand Up @@ -450,7 +451,7 @@ public void processInKeyOrder(CommitMode commitMode) {
primeFirstRecord();

// sanity check
assertThat(parallelConsumer.wm.getOptions().getOrdering()).isEqualTo(KEY);
assertThat(parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo(KEY);

sendSecondRecord(consumerSpy);

Expand Down Expand Up @@ -606,7 +607,7 @@ void processInKeyOrderWorkNotReturnedDoesntBreakCommits() {
sendSecondRecord(consumerSpy);

// sanity check
assertThat(parallelConsumer.wm.getOptions().getOrdering()).isEqualTo(KEY);
assertThat(parallelConsumer.getWm().getOptions().getOrdering()).isEqualTo(KEY);

// 0,1 previously sent to partition 0
// send one more, with same key of 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2021 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -12,7 +13,7 @@ public class ParallelEoSStreamProcessorTestBase extends AbstractParallelEoSStrea
protected ParallelEoSStreamProcessor<String, String> parallelConsumer;

@Override
protected ParentParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
protected AbstractParallelEoSStreamProcessor<String, String> initAsyncConsumer(ParallelConsumerOptions parallelConsumerOptions) {
parallelConsumer = initPollingAsyncConsumer(parallelConsumerOptions);
return parallelConsumer;
}
Expand Down
2 changes: 1 addition & 1 deletion parallel-consumer-core/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<!-- <logger name="io.confluent.parallelconsumer" level="debug"/>-->
<!-- <logger name="io.confluent.parallelconsumer" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer" level="error"/>-->
<!-- <logger name="io.confluent.parallelconsumer.ParentParallelEoSStreamProcessor" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer.internal.BrokerPollSystem" level="trace"/>-->
<!-- <logger name="io.confluent.parallelconsumer.integrationTests.BrokerIntegrationTest" level="info"/> &lt;!&ndash; docker logs &ndash;&gt;-->
<!-- <logger name="io.confluent.csid" level="info"/>-->
Expand Down
Loading

0 comments on commit 6e5f905

Please sign in to comment.