diff --git a/parallel-consumer-core/pom.xml b/parallel-consumer-core/pom.xml
index 7e9310078..c92cedf19 100644
--- a/parallel-consumer-core/pom.xml
+++ b/parallel-consumer-core/pom.xml
@@ -134,6 +134,8 @@
io.confluent.parallelconsumer.state.ProcessingShard
io.confluent.parallelconsumer.state.ShardKey
io.confluent.parallelconsumer.offsets.OffsetEncoding
+ io.confluent.parallelconsumer.PCMetrics
+ io.confluent.parallelconsumer.Offsets
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java
new file mode 100644
index 000000000..f25a566e3
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/Offsets.java
@@ -0,0 +1,62 @@
+package io.confluent.parallelconsumer;
+
+/*-
+ * Copyright (C) 2020-2022 Confluent, Inc.
+ */
+
+import lombok.Getter;
+import lombok.ToString;
+import lombok.Value;
+import lombok.experimental.Delegate;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.stream.Collectors;
+
+/**
+ * A range of {@link Offset}s.
+ */
+@ToString
+public class Offsets {
+
+ @Getter
+ @Delegate
+ private final List rawOffsets;
+
+ public Offsets(List records) {
+ this.rawOffsets = records;
+ }
+
+ public static Offsets fromRecords(List> records) {
+ return fromLongs(records.stream()
+ .map(RecordContext::offset)
+ .collect(Collectors.toUnmodifiableList()));
+ }
+
+ // due to type erasure, can't use method overloading
+ public static Offsets fromLongs(List rawOffsetsIn) {
+ return new Offsets(rawOffsetsIn);
+ }
+
+ public static Offsets fromArray(long... rawOffsetsIn) {
+ return new Offsets(Arrays.stream(rawOffsetsIn).boxed().collect(Collectors.toList()));
+ }
+
+ public static Offsets from(SortedSet incompleteOffsetsBelowHighestSucceeded) {
+ return new Offsets(new ArrayList<>(incompleteOffsetsBelowHighestSucceeded));
+ }
+
+ /**
+ * Class value for a Kafka offset, to avoid being Longly typed.
+ */
+ @Value
+ public static class Offset {
+ long value;
+
+ public static Offset of(Long offset) {
+ return new Offset(offset);
+ }
+ }
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCMetrics.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCMetrics.java
new file mode 100644
index 000000000..94cda8184
--- /dev/null
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/PCMetrics.java
@@ -0,0 +1,113 @@
+package io.confluent.parallelconsumer;
+
+import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
+import io.confluent.parallelconsumer.internal.State;
+import io.confluent.parallelconsumer.offsets.OffsetEncoding;
+import io.confluent.parallelconsumer.state.ShardKey;
+import lombok.Builder;
+import lombok.Value;
+import lombok.experimental.SuperBuilder;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Metrics model for Parallel Consumer
+ *
+ * @author Antony Stubbs
+ */
+@Value
+@SuperBuilder(toBuilder = true)
+public class PCMetrics {
+
+ long dynamicLoadFactor;
+
+ Map partitionMetrics;
+
+ Map shardMetrics;
+
+ PollerMetrics pollerMetrics;
+
+ /**
+ * The number of partitions assigned to this consumer
+ */
+ public long getNumberOfPartitions() {
+ return partitionMetrics.size();
+ }
+
+ /**
+ * The number of shards (queues) currently managed - depends on ordering and data key set
+ */
+ public long getNumberOfShards() {
+ return shardMetrics.size();
+ }
+
+ public long getTotalNumberOfIncompletes() {
+ return partitionMetrics.values().stream()
+ .mapToLong(PCPartitionMetrics::getNumberOfIncompletes)
+ .sum();
+ }
+
+ @Value
+ @SuperBuilder(toBuilder = true)
+ public static class PCPartitionMetrics {
+ TopicPartition topicPartition;
+ long lastCommittedOffset;
+ long numberOfIncompletes;
+ long highestCompletedOffset;
+ long highestSeenOffset;
+ long highestSequentialSucceededOffset;
+ long epoch;
+ CompressionStats compressionStats;
+
+ /**
+ * @see AbstractParallelEoSStreamProcessor#calculateMetricsWithIncompletes()
+ */
+ @Builder.Default
+ Optional incompleteMetrics = Optional.empty();
+
+ public long getTraditionalConsumerLag() {
+ return highestSeenOffset - highestSequentialSucceededOffset;
+ }
+
+ @Value
+ public static class IncompleteMetrics {
+ Offsets incompleteOffsets;
+ }
+
+ @Value
+ @SuperBuilder(toBuilder = true)
+ public static class CompressionStats {
+ long offsetsEncodedPerBit;
+ long offsetsEncodedPerByte;
+ long bytesUsedForEncoding;
+ double fractionOfEncodingSpaceUsed;
+ OffsetEncoding bestEncoding;
+ }
+ }
+
+ @Value
+ @SuperBuilder(toBuilder = true)
+ public static class ShardMetrics {
+ ShardKey shardKey;
+ /**
+ * Number of records queued for processing in this shard
+ */
+ long shardSize;
+ long averageUserProcessingTime;
+ long averageTimeSpentInQueue;
+ }
+
+ /**
+ * Metrics for the {@link io.confluent.parallelconsumer.internal.BrokerPollSystem} sub-system.
+ */
+ @Value
+ @SuperBuilder(toBuilder = true)
+ public static class PollerMetrics {
+ State state;
+ boolean paused;
+ Map pausedPartitions;
+ }
+
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
index 8f3b2bbcb..08584ec26 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java
@@ -1295,4 +1295,36 @@ public void resumeIfPaused() {
}
}
+
+ /**
+ * Gather metrics from the {@link ParallelConsumer} and for monitoring.
+ */
+ public PCMetrics calculateMetrics() {
+ var metrics = PCMetrics.builder();
+
+ addMetricsTo(metrics);
+
+ getWm().addMetricsTo(metrics);
+
+ return metrics.build();
+ }
+
+ /**
+ * todo docs
+ */
+ public PCMetrics calculateMetricsWithIncompletes() {
+ PCMetrics pcMetrics = calculateMetrics();
+ Map partitionMetrics = pcMetrics.getPartitionMetrics();
+
+ getWm().enrichWithIncompletes(partitionMetrics);
+
+ PCMetrics.PCMetricsBuilder, ?> metrics = pcMetrics.toBuilder();
+ return metrics.build();
+ }
+
+ protected void addMetricsTo(PCMetrics.PCMetricsBuilder, ? extends PCMetrics.PCMetricsBuilder, ?>> metrics) {
+ metrics.dynamicLoadFactor(dynamicExtraLoadFactor.getCurrentFactor());
+ metrics.pollerMetrics(brokerPollSubsystem.getMetrics());
+ }
+
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java
index 0ceae0031..5f014086c 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/BrokerPollSystem.java
@@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.state.WorkManager;
@@ -367,4 +368,14 @@ public void resumePollingAndWorkRegistrationIfPaused() {
log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState);
}
}
+
+ public PCMetrics.PollerMetrics getMetrics() {
+ var b = PCMetrics.PollerMetrics.builder();
+ b.state(runState);
+ b.paused(isPausedForThrottling());
+
+ //
+// b.pausedPartitions();
+ return b.build();
+ }
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
index 9a9e92df6..c872c3717 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java
@@ -4,6 +4,8 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.Offsets;
+import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
@@ -629,5 +631,30 @@ boolean checkIfWorkIsStale(final WorkContainer, ?> workContainer) {
return false;
}
-}
+ public PCMetrics.PCPartitionMetrics getMetrics() {
+ var b = PCMetrics.PCPartitionMetrics.builder();
+ b.topicPartition(getTp());
+
+ //
+ b.highestSeenOffset(getOffsetHighestSeen());
+ b.epoch(getPartitionsAssignmentEpoch());
+ b.highestCompletedOffset(getOffsetHighestSucceeded());
+ b.numberOfIncompletes(incompleteOffsets.size());
+ //
+// b.compressionStats(getCompressionStats());
+
+ //
+ return b.build();
+ }
+
+ private PCMetrics.PCPartitionMetrics.CompressionStats getCompressionStats() {
+ log.error("UnsupportedOperationException(\"Not implemented yet\");");
+ return PCMetrics.PCPartitionMetrics.CompressionStats.builder().build();
+ }
+
+ public Offsets calculateIncompleteMetrics() {
+ return Offsets.from(getIncompleteOffsetsBelowHighestSucceeded());
+ }
+
+}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java
index afc9a3e2b..4dadfaf3e 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java
@@ -4,6 +4,9 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.Offsets;
+import io.confluent.parallelconsumer.PCMetrics;
+import io.confluent.parallelconsumer.PCMetrics.PCPartitionMetrics.IncompleteMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
@@ -17,10 +20,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -298,4 +298,27 @@ public boolean isDirty() {
return this.partitionStates.values().stream()
.anyMatch(PartitionState::isDirty);
}
+
+ public Map getMetrics() {
+ return getAssignedPartitions().values().stream()
+ .map(PartitionState::getMetrics)
+ .collect(Collectors.toMap(PCMetrics.PCPartitionMetrics::getTopicPartition, t -> t));
+ }
+
+ public void enrichWithIncompletes(final Map metrics) {
+ getAssignedPartitions().forEach((key, value) -> {
+ // code smell? - a kind of weird way to do this
+ Offsets incompleteOffsets = value.calculateIncompleteMetrics();
+ var pcMetrics = Optional.ofNullable(metrics.get(key));
+ pcMetrics.ifPresent(metrics1 -> {
+ // replace the metrics with the new one with the incomplete offsets added
+ var b = metrics1.toBuilder();
+ var incompleteMetrics = Optional.of(new IncompleteMetrics(incompleteOffsets));
+ var build = b.incompleteMetrics(incompleteMetrics).build();
+ // replace the old metrics with the new one
+ metrics.put(key, build);
+ });
+ });
+ }
+
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
index a72187daf..f39679606 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ProcessingShard.java
@@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.RateLimiter;
@@ -163,4 +164,15 @@ private boolean isOrderRestricted() {
return options.getOrdering() != UNORDERED;
}
+ public PCMetrics.ShardMetrics getMetrics() {
+ var b = PCMetrics.ShardMetrics.builder();
+ b.shardKey(getKey());
+ b.shardSize(getCountOfWorkTracked());
+
+ //
+// b.averageTimeSpentInQueue();
+// b.averageUserProcessingTime();
+
+ return b.build();
+ }
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java
index 7e9eab337..4f3c89c8d 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/ShardManager.java
@@ -5,6 +5,7 @@
*/
import io.confluent.csid.utils.LoopingResumingIterator;
+import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
@@ -59,6 +60,7 @@ public class ShardManager {
* @see WorkManager#getWorkIfAvailable()
*/
// performance: could disable/remove if using partition order - but probably not worth the added complexity in the code to handle an extra special case
+ @Getter(AccessLevel.PRIVATE)
private final Map> processingShards = new ConcurrentHashMap<>();
/**
@@ -267,4 +269,9 @@ private void updateResumePoint(Optional getMetrics() {
+ return getProcessingShards().values().stream()
+ .map(ProcessingShard::getMetrics)
+ .collect(Collectors.toMap(PCMetrics.ShardMetrics::getShardKey, t -> t));
+ }
}
diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
index c69cbee2e..e224839be 100644
--- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
+++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkManager.java
@@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/
+import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.*;
import lombok.Getter;
@@ -272,4 +273,13 @@ public Optional getLowestRetryTime() {
public boolean isDirty() {
return pm.isDirty();
}
+
+ public void addMetricsTo(PCMetrics.PCMetricsBuilder, ? extends PCMetrics.PCMetricsBuilder, ?>> metrics) {
+ metrics.partitionMetrics(getPm().getMetrics());
+ metrics.shardMetrics(getSm().getMetrics());
+ }
+
+ public void enrichWithIncompletes(Map metrics) {
+ getPm().enrichWithIncompletes(metrics);
+ }
}
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java
index 6cf618f9a..4e3479c19 100644
--- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/AbstractParallelEoSStreamProcessorTestBase.java
@@ -490,4 +490,5 @@ protected LongPollingMockConsumerSubject assertThatConsumer(Stri
.about(LongPollingMockConsumerSubject.mockConsumers())
.that(consumerSpy);
}
+
}
diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java
new file mode 100644
index 000000000..87ad2acda
--- /dev/null
+++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/PCMetricsTest.java
@@ -0,0 +1,54 @@
+package io.confluent.parallelconsumer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+
+import static io.confluent.parallelconsumer.ManagedTruth.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * @author Antony Stubbs
+ * @see PCMetrics
+ */
+@Slf4j
+class PCMetricsTest extends ParallelEoSStreamProcessorTestBase {
+
+ @Test
+ void metricsBasics() {
+ final int quantity = 10_000;
+ ktu.sendRecords(quantity);
+
+ parallelConsumer.poll(recordContexts -> {
+ recordContexts.forEach(recordContext -> {
+ log.trace("Processing: {}", recordContext);
+ });
+ });
+
+ {
+ PCMetrics pcMetrics = parallelConsumer.calculateMetrics();
+ assertThat(pcMetrics).getPartitionMetrics().isNotEmpty();
+ }
+
+ // metrics have some data
+ await().untilAsserted(() -> {
+ PCMetrics pcMetrics = parallelConsumer.calculateMetrics();
+ PCMetrics.PCPartitionMetrics pcPartitionMetrics = pcMetrics.getPartitionMetrics().get(topicPartition);
+ assertThat(pcPartitionMetrics).getHighestSeenOffset().isAtLeast(400L);
+ assertThat(pcPartitionMetrics).getHighestCompletedOffset().isAtLeast(1L);
+ assertThat(pcPartitionMetrics).getNumberOfIncompletes().isEqualTo(0);
+ });
+
+ // metrics show processing is complete
+ await().untilAsserted(() -> {
+ PCMetrics pcMetrics = parallelConsumer.calculateMetricsWithIncompletes();
+ PCMetrics.PCPartitionMetrics pcPartitionMetrics = pcMetrics.getPartitionMetrics().get(topicPartition);
+ assertThat(pcPartitionMetrics).getHighestCompletedOffset().isEqualTo(quantity - 1);
+ assertThat(pcPartitionMetrics).getNumberOfIncompletes().isEqualTo(0);
+ assertThat(pcMetrics).getTotalNumberOfIncompletes().isEqualTo(0);
+ var incompleteMetrics = pcPartitionMetrics.getIncompleteMetrics();
+ assertThat(incompleteMetrics).isPresent();
+ var incompletes = incompleteMetrics.get().getIncompleteOffsets();
+ assertThat(incompletes).isEmpty();
+ });
+ }
+}