Skip to content

Commit

Permalink
START: Health metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Nov 2, 2022
1 parent 66b0158 commit fa41172
Show file tree
Hide file tree
Showing 12 changed files with 359 additions and 5 deletions.
2 changes: 2 additions & 0 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
<class>io.confluent.parallelconsumer.state.ProcessingShard</class>
<class>io.confluent.parallelconsumer.state.ShardKey</class>
<class>io.confluent.parallelconsumer.offsets.OffsetEncoding</class>
<class>io.confluent.parallelconsumer.PCMetrics</class>
<class>io.confluent.parallelconsumer.Offsets</class>
</classes>
<legacyClasses>
<!-- todo check legacy is recursive -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.Getter;
import lombok.ToString;
import lombok.Value;
import lombok.experimental.Delegate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.SortedSet;
import java.util.stream.Collectors;

/**
* A range of {@link Offset}s.
*/
@ToString
public class Offsets {

@Getter
@Delegate
private final List<Long> rawOffsets;

public Offsets(List<Long> records) {
this.rawOffsets = records;
}

public static Offsets fromRecords(List<RecordContext<?, ?>> records) {
return fromLongs(records.stream()
.map(RecordContext::offset)
.collect(Collectors.toUnmodifiableList()));
}

// due to type erasure, can't use method overloading
public static Offsets fromLongs(List<Long> rawOffsetsIn) {
return new Offsets(rawOffsetsIn);
}

public static Offsets fromArray(long... rawOffsetsIn) {
return new Offsets(Arrays.stream(rawOffsetsIn).boxed().collect(Collectors.toList()));
}

public static Offsets from(SortedSet<Long> incompleteOffsetsBelowHighestSucceeded) {
return new Offsets(new ArrayList<>(incompleteOffsetsBelowHighestSucceeded));
}

/**
* Class value for a Kafka offset, to avoid being Longly typed.
*/
@Value
public static class Offset {
long value;

public static Offset of(Long offset) {
return new Offset(offset);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.confluent.parallelconsumer;

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.state.ShardKey;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.SuperBuilder;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.Optional;

/**
* Metrics model for Parallel Consumer
*
* @author Antony Stubbs
*/
@Value
@SuperBuilder(toBuilder = true)
public class PCMetrics {

long dynamicLoadFactor;

Map<TopicPartition, PCPartitionMetrics> partitionMetrics;

Map<ShardKey, ShardMetrics> shardMetrics;

PollerMetrics pollerMetrics;

/**
* The number of partitions assigned to this consumer
*/
public long getNumberOfPartitions() {
return partitionMetrics.size();
}

/**
* The number of shards (queues) currently managed - depends on ordering and data key set
*/
public long getNumberOfShards() {
return shardMetrics.size();
}

public long getTotalNumberOfIncompletes() {
return partitionMetrics.values().stream()
.mapToLong(PCPartitionMetrics::getNumberOfIncompletes)
.sum();
}

@Value
@SuperBuilder(toBuilder = true)
public static class PCPartitionMetrics {
TopicPartition topicPartition;
long lastCommittedOffset;
long numberOfIncompletes;
long highestCompletedOffset;
long highestSeenOffset;
long highestSequentialSucceededOffset;
long epoch;
CompressionStats compressionStats;

/**
* @see AbstractParallelEoSStreamProcessor#calculateMetricsWithIncompletes()
*/
@Builder.Default
Optional<IncompleteMetrics> incompleteMetrics = Optional.empty();

public long getTraditionalConsumerLag() {
return highestSeenOffset - highestSequentialSucceededOffset;
}

@Value
public static class IncompleteMetrics {
Offsets incompleteOffsets;
}

@Value
@SuperBuilder(toBuilder = true)
public static class CompressionStats {
long offsetsEncodedPerBit;
long offsetsEncodedPerByte;
long bytesUsedForEncoding;
double fractionOfEncodingSpaceUsed;
OffsetEncoding bestEncoding;
}
}

@Value
@SuperBuilder(toBuilder = true)
public static class ShardMetrics {
ShardKey shardKey;
/**
* Number of records queued for processing in this shard
*/
long shardSize;
long averageUserProcessingTime;
long averageTimeSpentInQueue;
}

/**
* Metrics for the {@link io.confluent.parallelconsumer.internal.BrokerPollSystem} sub-system.
*/
@Value
@SuperBuilder(toBuilder = true)
public static class PollerMetrics {
State state;
boolean paused;
Map<TopicPartition, Boolean> pausedPartitions;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1295,4 +1295,36 @@ public void resumeIfPaused() {
}
}


/**
* Gather metrics from the {@link ParallelConsumer} and for monitoring.
*/
public PCMetrics calculateMetrics() {
var metrics = PCMetrics.builder();

addMetricsTo(metrics);

getWm().addMetricsTo(metrics);

return metrics.build();
}

/**
* todo docs
*/
public PCMetrics calculateMetricsWithIncompletes() {
PCMetrics pcMetrics = calculateMetrics();
Map<TopicPartition, PCMetrics.PCPartitionMetrics> partitionMetrics = pcMetrics.getPartitionMetrics();

getWm().enrichWithIncompletes(partitionMetrics);

PCMetrics.PCMetricsBuilder<?, ?> metrics = pcMetrics.toBuilder();
return metrics.build();
}

protected void addMetricsTo(PCMetrics.PCMetricsBuilder<?, ? extends PCMetrics.PCMetricsBuilder<?, ?>> metrics) {
metrics.dynamicLoadFactor(dynamicExtraLoadFactor.getCurrentFactor());
metrics.pollerMetrics(brokerPollSubsystem.getMetrics());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.state.WorkManager;
Expand Down Expand Up @@ -367,4 +368,14 @@ public void resumePollingAndWorkRegistrationIfPaused() {
log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState);
}
}

public PCMetrics.PollerMetrics getMetrics() {
var b = PCMetrics.PollerMetrics.builder();
b.state(runState);
b.paused(isPausedForThrottling());

//
// b.pausedPartitions();
return b.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.Offsets;
import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
Expand Down Expand Up @@ -629,5 +631,30 @@ boolean checkIfWorkIsStale(final WorkContainer<?, ?> workContainer) {
return false;
}

}
public PCMetrics.PCPartitionMetrics getMetrics() {
var b = PCMetrics.PCPartitionMetrics.builder();
b.topicPartition(getTp());

//
b.highestSeenOffset(getOffsetHighestSeen());
b.epoch(getPartitionsAssignmentEpoch());
b.highestCompletedOffset(getOffsetHighestSucceeded());
b.numberOfIncompletes(incompleteOffsets.size());

//
// b.compressionStats(getCompressionStats());

//
return b.build();
}

private PCMetrics.PCPartitionMetrics.CompressionStats getCompressionStats() {
log.error("UnsupportedOperationException(\"Not implemented yet\");");
return PCMetrics.PCPartitionMetrics.CompressionStats.builder().build();
}

public Offsets calculateIncompleteMetrics() {
return Offsets.from(getIncompleteOffsetsBelowHighestSucceeded());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.Offsets;
import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.PCMetrics.PCPartitionMetrics.IncompleteMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
Expand All @@ -17,10 +20,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -298,4 +298,27 @@ public boolean isDirty() {
return this.partitionStates.values().stream()
.anyMatch(PartitionState::isDirty);
}

public Map<TopicPartition, PCMetrics.PCPartitionMetrics> getMetrics() {
return getAssignedPartitions().values().stream()
.map(PartitionState::getMetrics)
.collect(Collectors.toMap(PCMetrics.PCPartitionMetrics::getTopicPartition, t -> t));
}

public void enrichWithIncompletes(final Map<TopicPartition, PCMetrics.PCPartitionMetrics> metrics) {
getAssignedPartitions().forEach((key, value) -> {
// code smell? - a kind of weird way to do this
Offsets incompleteOffsets = value.calculateIncompleteMetrics();
var pcMetrics = Optional.ofNullable(metrics.get(key));
pcMetrics.ifPresent(metrics1 -> {
// replace the metrics with the new one with the incomplete offsets added
var b = metrics1.toBuilder();
var incompleteMetrics = Optional.of(new IncompleteMetrics(incompleteOffsets));
var build = b.incompleteMetrics(incompleteMetrics).build();
// replace the old metrics with the new one
metrics.put(key, build);
});
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.RateLimiter;
Expand Down Expand Up @@ -163,4 +164,15 @@ private boolean isOrderRestricted() {
return options.getOrdering() != UNORDERED;
}

public PCMetrics.ShardMetrics getMetrics() {
var b = PCMetrics.ShardMetrics.builder();
b.shardKey(getKey());
b.shardSize(getCountOfWorkTracked());

//
// b.averageTimeSpentInQueue();
// b.averageUserProcessingTime();

return b.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class ShardManager<K, V> {
* @see WorkManager#getWorkIfAvailable()
*/
// performance: could disable/remove if using partition order - but probably not worth the added complexity in the code to handle an extra special case
@Getter(AccessLevel.PRIVATE)
private final Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -267,4 +269,9 @@ private void updateResumePoint(Optional<Map.Entry<ShardKey, ProcessingShard<K, V
}
}

public Map<ShardKey, PCMetrics.ShardMetrics> getMetrics() {
return getProcessingShards().values().stream()
.map(ProcessingShard::getMetrics)
.collect(Collectors.toMap(PCMetrics.ShardMetrics::getShardKey, t -> t));
}
}
Loading

0 comments on commit fa41172

Please sign in to comment.