Skip to content

Commit

Permalink
Fix tests after merging master
Browse files Browse the repository at this point in the history
  • Loading branch information
nachomdo committed Jan 5, 2023
1 parent 42f4b3c commit e3b56f8
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/

import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.BaseUnits;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.SuperBuilder;
Expand All @@ -23,7 +22,7 @@ public enum MetricsType { COUNTER, TIMER, GAUGE }
Double value;

Duration timerValue;
BaseUnits unit;
String unit;
@Builder.Default
Tags tags = Tags.empty();
@Builder.Default
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.state.ShardKey;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.SuperBuilder;
Expand All @@ -36,10 +34,6 @@ public class PCMetrics {

PollerMetrics pollerMetrics;

Timer functionTimer;

Counter successCounter;

/**
* The number of partitions assigned to this consumer
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.csid.utils.TimeUtils;
import io.confluent.parallelconsumer.*;
import io.confluent.parallelconsumer.state.WorkContainer;
import io.confluent.parallelconsumer.state.WorkManager;
import io.micrometer.core.instrument.binder.BaseUnits;
import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -224,6 +231,9 @@ public Exception getFailureCause() {

private final RateLimiter queueStatsLimiter = new RateLimiter();

@Getter(PROTECTED)
PCModule<K,V> module;

/**
* Control for stepping loading factor - shouldn't step if work requests can't be fulfilled due to restrictions.
* (e.g. we may want 10, but maybe there's a single partition and we're in partition mode - stepping up won't
Expand All @@ -243,7 +253,7 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp
*/
protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
Objects.requireNonNull(newOptions, "Options must be supplied");

this.module = module;
options = newOptions;
this.consumer = options.getConsumer();

Expand Down Expand Up @@ -274,6 +284,10 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp
this.producerManager = Optional.empty();
this.committer = this.brokerPollSubsystem;
}

new ExecutorServiceMetrics(workerThreadPool, "pc-worker-executor", Collections.emptyList())
.bindTo(module.meterRegistry());

}

private void validateConfiguration() {
Expand Down Expand Up @@ -1158,8 +1172,12 @@ protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunct
log.debug("Pool found work from old generation of assigned work, skipping message as epoch doesn't match current {}", workContainerBatch);
return null;
}

resultsFromUserFunction = usersFunction.apply(context);
var resultWithDuration = TimeUtils.timeWithMeta(()->usersFunction.apply(context));
resultsFromUserFunction = resultWithDuration.getResult();
module.eventBus().post(MetricsEvent.builder().name(PCMetricsTracker.METRIC_NAME_USER_FUNCTION_PROCESSING_TIME)
.type(MetricsEvent.MetricsType.TIMER)
.unit(BaseUnits.MILLISECONDS)
.timerValue(resultWithDuration.getElapsed()).build());

for (final WorkContainer<K, V> kvWorkContainer : workContainerBatch) {
onUserFunctionSuccess(kvWorkContainer, resultsFromUserFunction);
Expand Down Expand Up @@ -1200,6 +1218,42 @@ protected <R> List<ParallelConsumer.Tuple<ConsumerRecord<K, V>, R>> runUserFunct
}
}

/**
* Gather metrics from the {@link ParallelConsumer} and for monitoring.
*/
public PCMetrics calculateMetrics() {
var metrics = PCMetrics.builder();
var metricsRegistry = module.meterRegistry();

addMetricsTo(metrics);

getWm().addMetricsTo(metrics);

//TODO: should not create objects in metrics calculator method
{
new JvmMemoryMetrics().bindTo(metricsRegistry);
new JvmThreadMetrics().bindTo(metricsRegistry);
new ProcessorMetrics().bindTo(metricsRegistry);


// need to add closables to close system on shutdown
{
new JvmGcMetrics().bindTo(metricsRegistry);
new KafkaClientMetrics(consumer).bindTo(metricsRegistry);
// can bind to wrapper?
//new KafkaClientMetrics(producerManager.get().producerWrapper).bindTo(metricsRegistry);

// we don't use an admin client?
// new KafkaClientMetrics(admin).bindTo(metricsRegistry);

// can't for logback?
//new LogbackMetrics().bindTo(metricsRegistry);
}
}

return metrics.build();
}

protected void addToMailBoxOnUserFunctionSuccess(PollContextInternal<K, V> context, WorkContainer<K, V> wc, List<?> resultsFromUserFunction) {
addToMailbox(context, wc);
}
Expand Down Expand Up @@ -1305,4 +1359,22 @@ public void resumeIfPaused() {
}
}

/**
* todo docs
*/
public PCMetrics calculateMetricsWithIncompletes() {
PCMetrics pcMetrics = calculateMetrics();
Map<TopicPartition, PCMetrics.PCPartitionMetrics> partitionMetrics = pcMetrics.getPartitionMetrics();

getWm().enrichWithIncompletes(partitionMetrics);

PCMetrics.PCMetricsBuilder<?, ?> metrics = pcMetrics.toBuilder();
return metrics.build();
}

protected void addMetricsTo(PCMetrics.PCMetricsBuilder<?, ? extends PCMetrics.PCMetricsBuilder<?, ?>> metrics) {
metrics.dynamicLoadFactor(dynamicExtraLoadFactor.getCurrentFactor());
metrics.pollerMetrics(brokerPollSubsystem.getMetrics());
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.internal;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PCMetrics;
Expand Down Expand Up @@ -342,13 +342,13 @@ public void wakeupIfPaused() {
* Pause polling from the underlying Kafka Broker.
* <p>
* Note: If the poll system is currently not in state
* {@link io.confluent.parallelconsumer.internal.State#running running}, calling this method will be a no-op.
* {@link io.confluent.parallelconsumer.internal.State#RUNNING running}, calling this method will be a no-op.
* </p>
*/
public void pausePollingAndWorkRegistrationIfRunning() {
if (this.runState == State.running) {
if (this.runState == RUNNING) {
log.info("Transitioning broker poll system to state paused.");
this.runState = State.paused;
this.runState = PAUSED;
} else {
log.info("Skipping transition of broker poll system to state paused. Current state is {}.", this.runState);
}
Expand All @@ -358,13 +358,13 @@ public void pausePollingAndWorkRegistrationIfRunning() {
* Resume polling from the underlying Kafka Broker.
* <p>
* Note: If the poll system is currently not in state
* {@link io.confluent.parallelconsumer.internal.State#paused paused}, calling this method will be a no-op.
* {@link io.confluent.parallelconsumer.internal.State#PAUSED paused}, calling this method will be a no-op.
* </p>
*/
public void resumePollingAndWorkRegistrationIfPaused() {
if (this.runState == State.paused) {
if (this.runState == PAUSED) {
log.info("Transitioning broker poll system to state running.");
this.runState = State.running;
this.runState = RUNNING;
} else {
log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import java.util.concurrent.TimeUnit;

import static io.confluent.parallelconsumer.ManagedTruth.assertThat;
import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -44,7 +43,7 @@ void metricsBasics() {
assertThat(pcPartitionMetrics).getNumberOfIncompletes().isEqualTo(0);
});

final SimpleMeterRegistry metricsRegistry = parallelConsumer.getMetricsRegistry();
final SimpleMeterRegistry metricsRegistry = (SimpleMeterRegistry) this.getModule().meterRegistry();
metricsRegistry.getMeters().forEach(meter -> {
log.error("Meter: {}", meter);
});
Expand All @@ -61,11 +60,5 @@ void metricsBasics() {
var incompletes = incompleteMetrics.get().getIncompleteOffsets();
assertThat(incompletes).isEmpty();
});

{
PCMetrics metrics = parallelConsumer.calculateMetricsWithIncompletes();
log.warn("timer: {} ms", metrics.getFunctionTimer().mean(TimeUnit.MILLISECONDS));
log.warn("Counter: {}", metrics.getSuccessCounter().count());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.State;
Expand Down Expand Up @@ -45,8 +45,8 @@ void metricsRegisterBinding() {
// metrics have some data
await().untilAsserted(() -> {
assertFalse(registry.getMeters().isEmpty());
assertEquals(State.running.ordinal(),
registeredGaugeValueFor(PCMetricsTracker.METRIC_NAME_PC_STATUS, "status", State.running.name()));
assertEquals(State.RUNNING.ordinal(),
registeredGaugeValueFor(PCMetricsTracker.METRIC_NAME_PC_STATUS, "status", State.RUNNING.name()));
assertEquals(1, registeredGaugeValueFor(PCMetricsTracker.METRIC_NAME_NUMBER_SHARDS));
assertEquals(2, registeredGaugeValueFor(PCMetricsTracker.METRIC_NAME_NUMBER_PARTITIONS));
});
Expand All @@ -64,7 +64,7 @@ void metricsRegisterBinding() {

assertEquals(10_000, registeredCounterValueFor(PCMetricsTracker.METRIC_NAME_PROCESSED_RECORDS,
"epoch", "0", "topic", topicPartition.topic(), "partition", String.valueOf(topicPartition.partition())));
assertTrue(0 <= registeredTimerFor(PCMetricsTracker.METRIC_NAME_USER_FUNCTION_PROCESSING_TIME));
assertTrue(0.0 < registeredTimerFor(PCMetricsTracker.METRIC_NAME_USER_FUNCTION_PROCESSING_TIME));
});
}

Expand Down

0 comments on commit e3b56f8

Please sign in to comment.