Skip to content

Commit

Permalink
refactor: Refactoring split ProducerManager out to ProducerWrapper (#399
Browse files Browse the repository at this point in the history
)

Cohesion FTW
  • Loading branch information
astubbs authored Sep 2, 2022
1 parent 5355755 commit 32ae364
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public class ParallelEoSStreamProcessor<K, V> extends AbstractParallelEoSStreamP
*
* @see ParallelConsumerOptions
*/
public ParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
super(newOptions, module);
}

public ParallelEoSStreamProcessor(final ParallelConsumerOptions<K, V> newOptions) {
public ParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
super(newOptions);
}

Expand All @@ -58,40 +58,43 @@ public void poll(Consumer<PollContext<K, V>> usersVoidConsumptionFunction) {
@Override
@SneakyThrows
public void pollAndProduceMany(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
// todo refactor out the producer system to a sub class
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
if (!getOptions().isProducerSupplied()) {
throw new IllegalArgumentException("To use the produce flows you must supply a Producer in the options");
}

// wrap user func to add produce function
Function<PollContextInternal<K, V>, List<ConsumeProduceResult<K, V, K, V>>> wrappedUserFunc = context -> {
List<ProducerRecord<K, V>> recordListToProduce = carefullyRun(userFunction, context.getPollContext());
Function<PollContextInternal<K, V>, List<ConsumeProduceResult<K, V, K, V>>> wrappedUserFunc =
context -> userFunctionWrap(userFunction, context);

if (recordListToProduce.isEmpty()) {
log.debug("No result returned from function to send.");
}
log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce);
supervisorLoop(wrappedUserFunc, callback);
}

private List<ConsumeProduceResult<K, V, K, V>> userFunctionWrap(Function<PollContext<K, V>, List<ProducerRecord<K, V>>> userFunction,
PollContextInternal<K, V> context) {
List<ProducerRecord<K, V>> recordListToProduce = carefullyRun(userFunction, context.getPollContext());

if (recordListToProduce.isEmpty()) {
log.debug("No result returned from function to send.");
}
log.trace("asyncPoll and Stream - Consumed a record ({}), and returning a derivative result record to be produced: {}", context, recordListToProduce);

List<ConsumeProduceResult<K, V, K, V>> results = new ArrayList<>();
log.trace("Producing {} messages in result...", recordListToProduce.size());
List<ConsumeProduceResult<K, V, K, V>> results = new ArrayList<>();
log.trace("Producing {} messages in result...", recordListToProduce.size());

var futures = super.getProducerManager().get().produceMessages(recordListToProduce);
try {
for (Tuple<ProducerRecord<K, V>, Future<RecordMetadata>> future : futures) {
var recordMetadata = TimeUtils.time(() ->
var futures = super.getProducerManager().get().produceMessages(recordListToProduce);
try {
for (Tuple<ProducerRecord<K, V>, Future<RecordMetadata>> future : futures) {
var recordMetadata = TimeUtils.time(() ->
future.getRight().get(options.getSendTimeout().toMillis(), TimeUnit.MILLISECONDS));
var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata);
results.add(result);
}
} catch (Exception e) {
throw new InternalRuntimeError("Error while waiting for produce results", e);
var result = new ConsumeProduceResult<>(context.getPollContext(), future.getLeft(), recordMetadata);
results.add(result);
}
} catch (Exception e) {
throw new InternalRuntimeError("Error while waiting for produce results", e);
}

return results;
};

supervisorLoop(wrappedUserFunc, callback);
return results;
}

@Override
Expand All @@ -114,8 +117,8 @@ public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> use

@Override
@SneakyThrows
public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> userFunction,
Consumer<ConsumeProduceResult<K, V, K, V>> callback) {
pollAndProduceMany(consumerRecord -> UniLists.of(userFunction.apply(consumerRecord)), callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ public void registerWork(EpochAndRecordsMap<K, V> polledRecords) {
* @see #blockableControlThread
*/
public void notifySomethingToDo() {
boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionInProgress).orElse(false);
boolean noTransactionInProgress = !producerManager.map(ProducerManager::isTransactionCommittingInProgress).orElse(false);
if (noTransactionInProgress) {
log.trace("Interrupting control thread: Knock knock, wake up! You've got mail (tm)!");
interruptControlThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;

import java.util.function.Supplier;

/**
* Minimum dependency injection system, modled on how Dagger works.
* <p>
Expand All @@ -36,34 +34,46 @@ public ParallelConsumerOptions<K, V> options() {
return optionsInstance;
}

private ProducerManager<K, V> kvProducerManager;
private ProducerWrapper<K, V> producerWrapper;

protected ProducerWrapper<K, V> producerWrap() {
if (this.producerWrapper == null) {
this.producerWrapper = new ProducerWrapper<>(options());
}
return producerWrapper;
}

private ProducerManager<K, V> producerManager;

//Provides
protected ProducerManager<K, V> producerManager() {
if (kvProducerManager == null) {
this.kvProducerManager = new ProducerManager<K, V>(producer(), consumerManager(), workManager(), options());
if (producerManager == null) {
this.producerManager = new ProducerManager<>(producerWrap(), consumerManager(), workManager(), options());
}
return kvProducerManager;
return producerManager;
}

private Producer<K, V> producer() {
public Producer<K, V> producer() {
return optionsInstance.getProducer();
}

private ConsumerManager consumerManager;
public Consumer<K, V> consumer() {
return optionsInstance.getConsumer();
}

private ConsumerManager<K, V> consumerManager;

protected ConsumerManager<K, V> consumerManager() {
if (consumerManager == null) {
consumerManager = new ConsumerManager(optionsInstance.getConsumer());
consumerManager = new ConsumerManager<>(optionsInstance.getConsumer());
}
return consumerManager;
}

private WorkManager workManager;
private WorkManager<K, V> workManager;

public WorkManager<K, V> workManager() {
if (workManager == null) {
workManager = new WorkManager<K, V>(this, dynamicExtraLoadFactor(), TimeUtils.getClock());
workManager = new WorkManager<>(this, dynamicExtraLoadFactor(), TimeUtils.getClock());
}
return workManager;
}
Expand All @@ -81,21 +91,13 @@ protected DynamicLoadFactor dynamicExtraLoadFactor() {
return dynamicLoadFactor;
}

private BrokerPollSystem brokerPollSystem;
private BrokerPollSystem<K, V> brokerPollSystem;

protected BrokerPollSystem<K, V> brokerPoller(AbstractParallelEoSStreamProcessor<K, V> pc) {
if (brokerPollSystem == null) {
// final ParallelEoSStreamProcessor<K, V> pc = pc();
brokerPollSystem = new BrokerPollSystem<>(consumerManager(), workManager(), pc, options());
}
return brokerPollSystem;
}

public Supplier<AbstractParallelEoSStreamProcessor<K, V>> pcSupplier() {
return this::pc;
}

public Consumer<K, V> consumer() {
return optionsInstance.getConsumer();
}
}
Loading

0 comments on commit 32ae364

Please sign in to comment.