Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #71: Feature: Health check and metrics #464

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions parallel-consumer-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
<class>io.confluent.parallelconsumer.state.ProcessingShard</class>
<class>io.confluent.parallelconsumer.state.ShardKey</class>
<class>io.confluent.parallelconsumer.offsets.OffsetEncoding</class>
<class>io.confluent.parallelconsumer.PCMetrics</class>
<class>io.confluent.parallelconsumer.Offsets</class>
</classes>
<legacyClasses>
<!-- todo check legacy is recursive -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.Getter;
import lombok.ToString;
import lombok.Value;
import lombok.experimental.Delegate;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.SortedSet;
import java.util.stream.Collectors;

/**
* A range of {@link Offset}s.
*/
@ToString
public class Offsets {

@Getter
@Delegate
private final List<Long> rawOffsets;

public Offsets(List<Long> records) {
this.rawOffsets = records;
}

public static Offsets fromRecords(List<RecordContext<?, ?>> records) {
return fromLongs(records.stream()
.map(RecordContext::offset)
.collect(Collectors.toList()));
}

// due to type erasure, can't use method overloading
public static Offsets fromLongs(List<Long> rawOffsetsIn) {
return new Offsets(rawOffsetsIn);
}

public static Offsets fromArray(long... rawOffsetsIn) {
return new Offsets(Arrays.stream(rawOffsetsIn).boxed().collect(Collectors.toList()));
}

public static Offsets from(SortedSet<Long> incompleteOffsetsBelowHighestSucceeded) {
return new Offsets(new ArrayList<>(incompleteOffsetsBelowHighestSucceeded));
}

/**
* Class value for a Kafka offset, to avoid being Longly typed.
*/
@Value
public static class Offset {
long value;

public static Offset of(Long offset) {
return new Offset(offset);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package io.confluent.parallelconsumer;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.State;
import io.confluent.parallelconsumer.offsets.OffsetEncoding;
import io.confluent.parallelconsumer.state.ShardKey;
import lombok.Builder;
import lombok.Value;
import lombok.experimental.SuperBuilder;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;
import java.util.Optional;

/**
* Metrics model for Parallel Consumer
*
* @author Antony Stubbs
*/
@Value
@SuperBuilder(toBuilder = true)
public class PCMetrics {

long dynamicLoadFactor;

Map<TopicPartition, PCPartitionMetrics> partitionMetrics;

Map<ShardKey, ShardMetrics> shardMetrics;

PollerMetrics pollerMetrics;

/**
* The number of partitions assigned to this consumer
*/
public long getNumberOfPartitions() {
return partitionMetrics.size();
}

/**
* The number of shards (queues) currently managed - depends on ordering and data key set
*/
public long getNumberOfShards() {
return shardMetrics.size();
}

public long getTotalNumberOfIncompletes() {
return partitionMetrics.values().stream()
.mapToLong(PCPartitionMetrics::getNumberOfIncompletes)
.sum();
}

@Value
@SuperBuilder(toBuilder = true)
public static class PCPartitionMetrics {
TopicPartition topicPartition;
long lastCommittedOffset;
long numberOfIncompletes;
long highestCompletedOffset;
long highestSeenOffset;
long highestSequentialSucceededOffset;
long epoch;
CompressionStats compressionStats;

/**
* @see AbstractParallelEoSStreamProcessor#calculateMetricsWithIncompletes()
*/
@Builder.Default
Optional<IncompleteMetrics> incompleteMetrics = Optional.empty();

public long getTraditionalConsumerLag() {
return highestSeenOffset - highestSequentialSucceededOffset;
}

@Value
public static class IncompleteMetrics {
Offsets incompleteOffsets;
}

@Value
@SuperBuilder(toBuilder = true)
public static class CompressionStats {
long offsetsEncodedPerBit;
long offsetsEncodedPerByte;
long bytesUsedForEncoding;
double fractionOfEncodingSpaceUsed;
OffsetEncoding bestEncoding;
}
}

@Value
@SuperBuilder(toBuilder = true)
public static class ShardMetrics {
ShardKey shardKey;
/**
* Number of records queued for processing in this shard
*/
long shardSize;
long averageUserProcessingTime;
long averageTimeSpentInQueue;
}

/**
* Metrics for the {@link io.confluent.parallelconsumer.internal.BrokerPollSystem} sub-system.
*/
@Value
@SuperBuilder(toBuilder = true)
public static class PollerMetrics {
State state;
boolean paused;
Map<TopicPartition, Boolean> pausedPartitions;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1295,4 +1295,36 @@ public void resumeIfPaused() {
}
}


/**
* Gather metrics from the {@link ParallelConsumer} and for monitoring.
*/
public PCMetrics calculateMetrics() {
var metrics = PCMetrics.builder();

addMetricsTo(metrics);

getWm().addMetricsTo(metrics);

return metrics.build();
}

/**
* todo docs
*/
public PCMetrics calculateMetricsWithIncompletes() {
PCMetrics pcMetrics = calculateMetrics();
Map<TopicPartition, PCMetrics.PCPartitionMetrics> partitionMetrics = pcMetrics.getPartitionMetrics();

getWm().enrichWithIncompletes(partitionMetrics);

PCMetrics.PCMetricsBuilder<?, ?> metrics = pcMetrics.toBuilder();
return metrics.build();
}

protected void addMetricsTo(PCMetrics.PCMetricsBuilder<?, ? extends PCMetrics.PCMetricsBuilder<?, ?>> metrics) {
metrics.dynamicLoadFactor(dynamicExtraLoadFactor.getCurrentFactor());
metrics.pollerMetrics(brokerPollSubsystem.getMetrics());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode;
import io.confluent.parallelconsumer.state.WorkManager;
Expand Down Expand Up @@ -367,4 +368,14 @@ public void resumePollingAndWorkRegistrationIfPaused() {
log.info("Skipping transition of broker poll system to state running. Current state is {}.", this.runState);
}
}

public PCMetrics.PollerMetrics getMetrics() {
var b = PCMetrics.PollerMetrics.builder();
b.state(runState);
b.paused(isPausedForThrottling());

//
// b.pausedPartitions();
return b.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.Offsets;
import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
import io.confluent.parallelconsumer.internal.EpochAndRecordsMap;
import io.confluent.parallelconsumer.internal.PCModule;
Expand Down Expand Up @@ -629,5 +631,30 @@ boolean checkIfWorkIsStale(final WorkContainer<?, ?> workContainer) {
return false;
}

}
public PCMetrics.PCPartitionMetrics getMetrics() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we invert this? Move the construction into the metrics class, instead of the state class? (pass the metrics class the state)

var b = PCMetrics.PCPartitionMetrics.builder();
b.topicPartition(getTp());

//
b.highestSeenOffset(getOffsetHighestSeen());
b.epoch(getPartitionsAssignmentEpoch());
b.highestCompletedOffset(getOffsetHighestSucceeded());
b.numberOfIncompletes(incompleteOffsets.size());

//
// b.compressionStats(getCompressionStats());

//
return b.build();
}

private PCMetrics.PCPartitionMetrics.CompressionStats getCompressionStats() {
log.error("UnsupportedOperationException(\"Not implemented yet\");");
return PCMetrics.PCPartitionMetrics.CompressionStats.builder().build();
}

public Offsets calculateIncompleteMetrics() {
return Offsets.from(getIncompleteOffsetsBelowHighestSucceeded());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.Offsets;
import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.PCMetrics.PCPartitionMetrics.IncompleteMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
import io.confluent.parallelconsumer.internal.BrokerPollSystem;
Expand All @@ -17,10 +20,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -298,4 +298,27 @@ public boolean isDirty() {
return this.partitionStates.values().stream()
.anyMatch(PartitionState::isDirty);
}

public Map<TopicPartition, PCMetrics.PCPartitionMetrics> getMetrics() {
return getAssignedPartitions().values().stream()
.map(PartitionState::getMetrics)
.collect(Collectors.toMap(PCMetrics.PCPartitionMetrics::getTopicPartition, t -> t));
}

public void enrichWithIncompletes(final Map<TopicPartition, PCMetrics.PCPartitionMetrics> metrics) {
getAssignedPartitions().forEach((key, value) -> {
// code smell? - a kind of weird way to do this
Offsets incompleteOffsets = value.calculateIncompleteMetrics();
var pcMetrics = Optional.ofNullable(metrics.get(key));
pcMetrics.ifPresent(metrics1 -> {
// replace the metrics with the new one with the incomplete offsets added
var b = metrics1.toBuilder();
var incompleteMetrics = Optional.of(new IncompleteMetrics(incompleteOffsets));
var build = b.incompleteMetrics(incompleteMetrics).build();
// replace the old metrics with the new one
metrics.put(key, build);
});
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.RateLimiter;
Expand Down Expand Up @@ -163,4 +164,15 @@ private boolean isOrderRestricted() {
return options.getOrdering() != UNORDERED;
}

public PCMetrics.ShardMetrics getMetrics() {
var b = PCMetrics.ShardMetrics.builder();
b.shardKey(getKey());
b.shardSize(getCountOfWorkTracked());

//
// b.averageTimeSpentInQueue();
// b.averageUserProcessingTime();

return b.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import io.confluent.csid.utils.LoopingResumingIterator;
import io.confluent.parallelconsumer.PCMetrics;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
import io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class ShardManager<K, V> {
* @see WorkManager#getWorkIfAvailable()
*/
// performance: could disable/remove if using partition order - but probably not worth the added complexity in the code to handle an extra special case
@Getter(AccessLevel.PRIVATE)
private final Map<ShardKey, ProcessingShard<K, V>> processingShards = new ConcurrentHashMap<>();

/**
Expand Down Expand Up @@ -267,4 +269,9 @@ private void updateResumePoint(Optional<Map.Entry<ShardKey, ProcessingShard<K, V
}
}

public Map<ShardKey, PCMetrics.ShardMetrics> getMetrics() {
return getProcessingShards().values().stream()
.map(ProcessingShard::getMetrics)
.collect(Collectors.toMap(PCMetrics.ShardMetrics::getShardKey, t -> t));
}
}
Loading