Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/refactor metrics singleton #630

Merged
merged 5 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ endif::[]

=== Fixes

* fix: Add synchronization to ensure proper intializaiton and closing of PCMetrics singleton. (#617)

== 0.5.2.7

=== Fixes

* fix: Return cached pausedPartitionSet (#618)
* fix: Parallel consumer stops processing data sometimes (#606)
* fix: Add synchronization to ensure proper intializaiton and closing of PCMetrics singleton. (#617)
* fix: Refactor metrics implementation to not use singleton - improves meter separation, allows correct metrics subsystem operation when multiple parallel consumer instances are running in same java process (#630), fixes (#617) improves on (#627)

== 0.5.2.6
=== Improvements
Expand Down
19 changes: 11 additions & 8 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1192,29 +1192,31 @@ Following example illustrates setup of Parallel Consumer with Meter Registry and
----
ParallelStreamProcessor<String, String> setupParallelConsumer() {
Consumer<String, String> kafkaConsumer = getKafkaConsumer();

String instanceId = UUID.randomUUID().toString();
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
.maxConcurrency(1000)
.consumer(kafkaConsumer)
.meterRegistry(meterRegistry) //<1>
.metricsTags(Tags.of(Tag.of("instance", "pc1"))) //<2>
.metricsTags(Tags.of(Tag.of("common-tag", "tag1"))) //<2>
.pcInstanceTag(instanceId) //<3>
.build();

ParallelStreamProcessor<String, String> eosStreamProcessor =
ParallelStreamProcessor.createEosStreamProcessor(options);

eosStreamProcessor.subscribe(of(inputTopic));

kafkaClientMetrics = new KafkaClientMetrics(kafkaConsumer); //<3>
kafkaClientMetrics.bindTo(meterRegistry); //<4>
kafkaClientMetrics = new KafkaClientMetrics(kafkaConsumer); //<4>
kafkaClientMetrics.bindTo(meterRegistry); //<5>
return eosStreamProcessor;
}
----
<1> - Meter Registry is set through ParallelConsumerOptions.builder(), if not specified - will default to CompositeMeterRegistry - which is No-op.
<2> - Optional - "instance" tag with value of "pc1" is set through same builder - it will be added to all Parallel Consumer meters
<3> - Optional - Kafka Consumer Micrometer metrics object created for Kafka Consumer that is later used for Parallel Consumer.
<4> - Optional - Kafka Consumer Micrometer metrics are bound to Meter Registry.
<2> - Optional - common tags can be specified through same builder - they will be added to all Parallel Consumer meters
<3> - Optional - instance tag value can be specified - it has to be unique to ensure meter uniqueness in cases when multiple parallel consumer instances are recording metrics to the same meter registry. If instance tag is not specified - unique UUID value will be generated and used. Tag is created with tag key 'pcinstance'.
<4> - Optional - Kafka Consumer Micrometer metrics object created for Kafka Consumer that is later used for Parallel Consumer.
<5> - Optional - Kafka Consumer Micrometer metrics are bound to Meter Registry.

NOTE:: any additional binders / metrics need to be cleaned up appropriately - for example the Kafka Consumer Metrics registered above - need to be closed using `kafkaClientMetrics.close()` after calling shutting down Parallel Consumer as Parallel Consumer will close Kafka Consumer on shutdown.

Expand Down Expand Up @@ -1512,13 +1514,14 @@ NOTE:: Dependency version bumps are not listed here.
ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.7

=== Fixes

* fix: Return cached pausedPartitionSet (#618)
* fix: Parallel consumer stops processing data sometimes (#606)
* fix: Add synchronization to ensure proper intializaiton and closing of PCMetrics singleton. (#617)
* fix: Refactor metrics implementation to not use singleton - improves meter separation, allows correct metrics subsystem operation when multiple parallel consumer instances are running in same java process (#630), fixes (#617) improves on (#627)

== 0.5.2.6
=== Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.metrics.PCMetricsDef;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
Expand All @@ -20,6 +21,7 @@

import java.time.Duration;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;

import static io.confluent.csid.utils.StringUtils.msg;
Expand Down Expand Up @@ -77,10 +79,22 @@ public class ParallelConsumerOptions<K, V> {

/**
* Micrometer MeterRegistry
* <p>
* Optional - if not specified CompositeMeterRegistry will be used which is NoOp
*/
@Builder.Default
private final MeterRegistry meterRegistry = new CompositeMeterRegistry();
private final MeterRegistry meterRegistry;

/**
* PC Instance metrics tag value - if specified - should be unique to allow instance specific meters to be created
* and cleared. Used with Tag key {@link PCMetricsDef#PC_INSTANCE_TAG}
* <p>
* If not set - unique UUID will be generated for it
*/
private final String pcInstanceTag;

/**
* Additional common metrics tags - will be added to all created meters
*/
@Builder.Default
private final Iterable<Tag> metricsTags = Tags.empty();

Expand Down Expand Up @@ -331,8 +345,9 @@ public enum InvalidOffsetMetadataHandlingPolicy {
}

/**
* Controls the error handling behaviour to use when invalid offsets metadata from a pre-existing consumer group is encountered.
* A potential scenario where this could occur is when a consumer group id from a Kafka Streams application is accidentally reused.
* Controls the error handling behaviour to use when invalid offsets metadata from a pre-existing consumer group is
* encountered. A potential scenario where this could occur is when a consumer group id from a Kafka Streams
* application is accidentally reused.
* <p>
* Default is {@link InvalidOffsetMetadataHandlingPolicy#FAIL}
*/
Expand Down Expand Up @@ -479,15 +494,16 @@ public boolean isProducerSupplied() {
}

/**
* Timeout for shutting down execution pool during shutdown in DONT_DRAIN mode. Should be high enough to allow
* for inflight messages to finish processing, but low enough to kill any blocked thread to allow to rebalance in a timely manner,
* especially if shutting down on error.
* Timeout for shutting down execution pool during shutdown in DONT_DRAIN mode. Should be high enough to allow for
* inflight messages to finish processing, but low enough to kill any blocked thread to allow to rebalance in a
* timely manner, especially if shutting down on error.
*/
@Builder.Default
public final Duration shutdownTimeout = Duration.ofSeconds(10);

/**
* Timeout for draining queue during shutdown in DRAIN mode. Should be high enough to allow for all queued messages to process.
* Timeout for draining queue during shutdown in DRAIN mode. Should be high enough to allow for all queued messages
* to process.
*/
@Builder.Default
public final Duration drainTimeout = Duration.ofSeconds(30);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static io.confluent.csid.utils.BackportUtils.toSeconds;
import static io.confluent.csid.utils.StringUtils.msg;
import static io.confluent.parallelconsumer.internal.State.*;
import static io.confluent.parallelconsumer.metrics.PCMetricsDef.METER_PREFIX;
import static io.confluent.parallelconsumer.metrics.PCMetricsDef.USER_FUNCTION_EXECUTOR_PREFIX;
import static java.lang.Boolean.TRUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -260,6 +260,8 @@ public Exception getFailureCause() {

private Duration drainTimeout;

private PCMetrics pcMetrics;

protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions) {
this(newOptions, new PCModule<>(newOptions));
}
Expand All @@ -286,7 +288,7 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp
newOptions.getConsumer().groupMetadata().groupId(),
newOptions);
//Initialize global metrics - should be initialized before any of the module objects are created so that meters can be bound in them.
PCMetrics.initialize(options.getMeterRegistry(), options.getMetricsTags());
pcMetrics = module.pcMetrics();

this.dynamicExtraLoadFactor = module.dynamicExtraLoadFactor();

Expand All @@ -311,13 +313,13 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp
}

private void initMetrics() {
this.userProcessingTimer = PCMetrics.getInstance().getTimerFromMetricDef(PCMetricsDef.USER_FUNCTION_PROCESSING_TIME);
this.loadFactorGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.DYNAMIC_EXTRA_LOAD_FACTOR,
this.userProcessingTimer = pcMetrics.getTimerFromMetricDef(PCMetricsDef.USER_FUNCTION_PROCESSING_TIME);
this.loadFactorGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.DYNAMIC_EXTRA_LOAD_FACTOR,
dynamicExtraLoadFactor, DynamicLoadFactor::getCurrentFactor);
this.statusGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PC_STATUS, this, pc -> pc.state.getValue());
this.statusGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.PC_STATUS, this, pc -> pc.state.getValue());
new ExecutorServiceMetrics(this.getWorkerThreadPool().get(), "pc-user-function-executor",
METER_PREFIX,
this.options.getMetricsTags()).bindTo(PCMetrics.getInstance().getMeterRegistry());
USER_FUNCTION_EXECUTOR_PREFIX,
pcMetrics.getCommonTags()).bindTo(pcMetrics.getMeterRegistry());
}

private void validateConfiguration() {
Expand Down Expand Up @@ -626,7 +628,8 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
maybeCloseConsumer();

producerManager.ifPresent(x -> x.close(timeout));
PCMetrics.close();
deregisterMeters();
pcMetrics.close();
log.debug("Close complete.");
this.state = CLOSED;

Expand All @@ -635,6 +638,13 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
}
}

/**
* De-registers and removes user function executor meters from meter registry on shutdown
*/
private void deregisterMeters() {
pcMetrics.removeMetersByPrefixAndCommonTags(USER_FUNCTION_EXECUTOR_PREFIX);
}

/**
* To keep things simple, make sure the correct thread which can make a commit, is the one to close the consumer.
* This way, if partitions are revoked, the commit can be made inline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class BrokerPollSystem<K, V> implements OffsetCommitter {

private final WorkManager<K, V> wm;

private final PCMetrics pcMetrics;

private Gauge statusGauge;
private Gauge numPausedPartitionsGauge;

Expand All @@ -83,12 +85,13 @@ public BrokerPollSystem(ConsumerManager<K, V> consumerMgr, WorkManager<K, V> wm,
committer = Optional.of(consumerCommitter);
}
}
pcMetrics = pc.getModule().pcMetrics();
initMetrics();
}

private void initMetrics() {
statusGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.PC_POLLER_STATUS, this, poller -> poller.runState.getValue());
numPausedPartitionsGauge = PCMetrics.getInstance().gaugeFromMetricDef(PCMetricsDef.NUM_PAUSED_PARTITIONS,
statusGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.PC_POLLER_STATUS, this, poller -> poller.runState.getValue());
numPausedPartitionsGauge = pcMetrics.gaugeFromMetricDef(PCMetricsDef.NUM_PAUSED_PARTITIONS,
this.consumerManager, ConsumerManager::getPausedPartitionSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.metrics.PCMetrics;
import io.confluent.parallelconsumer.state.WorkManager;
import lombok.Setter;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -106,4 +107,13 @@ protected BrokerPollSystem<K, V> brokerPoller(AbstractParallelEoSStreamProcessor
public Clock clock() {
return TimeUtils.getClock();
}

private PCMetrics pcMetrics;

public PCMetrics pcMetrics() {
if (pcMetrics == null) {
pcMetrics = new PCMetrics(options().getMeterRegistry(), optionsInstance.getMetricsTags(), optionsInstance.getPcInstanceTag());
}
return pcMetrics;
}
}
Loading