Skip to content

Commit

Permalink
Removes object creation from metrics calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
nachomdo committed Jan 6, 2023
1 parent 32587d2 commit 000bd95
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 115 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.internal.PCModule;
import io.confluent.parallelconsumer.internal.ProducerManager;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -166,8 +172,12 @@ public void pollAndProduce(Function<PollContext<K, V>, ProducerRecord<K, V>> use

@Override
public void registerMetricsTracker(MeterBinder binder) {
binder.bindTo(getModule().meterRegistry());
var metricsRegistry = getModule().meterRegistry();

UniLists.of(binder, new JvmMemoryMetrics(), new JvmThreadMetrics(), new ProcessorMetrics(), new JvmGcMetrics(),
new ExecutorServiceMetrics(this.getWorkerThreadPool(), "pc-user-function-executor", Tags.empty()))
.forEach(meterBinder -> meterBinder.bindTo(metricsRegistry));

getModule().eventBus().register(binder);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.binder.BaseUnits;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -107,6 +102,7 @@ public Duration getTimeBetweenCommits() {
/**
* The pool which is used for running the users' supplied function
*/
@Getter(PROTECTED)
protected final ThreadPoolExecutor workerThreadPool;

private Optional<Future<Boolean>> controlThreadFuture = Optional.empty();
Expand Down Expand Up @@ -1229,28 +1225,6 @@ public PCMetrics calculateMetrics() {

getWm().addMetricsTo(metrics);

//TODO: should not create objects in metrics calculator method
{
new JvmMemoryMetrics().bindTo(metricsRegistry);
new JvmThreadMetrics().bindTo(metricsRegistry);
new ProcessorMetrics().bindTo(metricsRegistry);


// need to add closables to close system on shutdown
{
new JvmGcMetrics().bindTo(metricsRegistry);
new KafkaClientMetrics(consumer).bindTo(metricsRegistry);
// can bind to wrapper?
//new KafkaClientMetrics(producerManager.get().producerWrapper).bindTo(metricsRegistry);

// we don't use an admin client?
// new KafkaClientMetrics(admin).bindTo(metricsRegistry);

// can't for logback?
//new LogbackMetrics().bindTo(metricsRegistry);
}
}

return metrics.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelStreamProcessor;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmHeapPressureMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
Expand Down Expand Up @@ -106,13 +102,11 @@ protected void postSetup() {
@SuppressWarnings({"FeatureEnvy", "MagicNumber"})
ParallelStreamProcessor<String, String> setupParallelConsumer() {
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <1>
Producer<String, String> kafkaProducer = getKafkaProducer();

var options = ParallelConsumerOptions.<String, String>builder()
.ordering(ParallelConsumerOptions.ProcessingOrder.KEY) // <2>
.maxConcurrency(1000) // <3>
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.meterRegistry(metricsRegistry)
.build();

Expand All @@ -121,12 +115,13 @@ ParallelStreamProcessor<String, String> setupParallelConsumer() {

eosStreamProcessor.subscribe(of(inputTopic)); // <4>

new KafkaClientMetrics(kafkaConsumer).bindTo(metricsRegistry);

pcMetricsTracker = new PCMetricsTracker(eosStreamProcessor::calculateMetricsWithIncompletes,
UniLists.of(Tag.of("region", "eu-west-1"), Tag.of("instance", "pc1")));

eosStreamProcessor.registerMetricsTracker(pcMetricsTracker);
UniLists.of(new KafkaClientMetrics(kafkaConsumer), new JvmMemoryMetrics(),
new JvmHeapPressureMetrics(), new JvmGcMetrics(), new JvmThreadMetrics())
.stream().forEach(m -> m.bindTo(metricsRegistry));


return eosStreamProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -90,12 +87,6 @@ Consumer<String, String> getKafkaConsumer() {
return mockConsumer;
}

@Override
Producer<String, String> getKafkaProducer() {
var stringSerializer = Serdes.String().serializer();
return new MockProducer<>(true, stringSerializer, stringSerializer);
}

@Override
protected void postSetup() {
super.postSetup();
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

<modules>
<module>parallel-consumer-core</module>
<module>parallel-consumer-metrics</module>
<module>parallel-consumer-vertx</module>
<module>parallel-consumer-reactor</module>
<module>parallel-consumer-examples</module>
Expand Down

0 comments on commit 000bd95

Please sign in to comment.