Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes #409: Adds support for compacted topics and commit offset resetting #425

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
5e8bfb6
START: fixes #409: Truncate state on offset high reset
astubbs Sep 30, 2022
d1c42d6
review
astubbs Sep 30, 2022
5dca73c
review
astubbs Sep 30, 2022
23c289f
step - continuous checking and restructure
astubbs Sep 30, 2022
2ea3573
incomplete offsets docs
astubbs Oct 1, 2022
835888a
docs and rename
astubbs Oct 1, 2022
f58c467
fixup! review
astubbs Oct 1, 2022
000969c
Revert "step - continuous checking and restructure"
astubbs Oct 1, 2022
b4af22a
clean up tangents, clarify
astubbs Oct 1, 2022
26ee4b8
pruneRemovedTrackedIncompleteOffsets and detect committed offset rese…
astubbs Oct 4, 2022
87dd82e
test scenarios
astubbs Oct 5, 2022
5c41b3f
tests
astubbs Oct 5, 2022
0a13ca6
issue #409 captured in test and fixed
astubbs Oct 6, 2022
d713b29
three scenarios captured and fixed
astubbs Oct 6, 2022
3336746
SAVE: fixing tests
astubbs Oct 6, 2022
66fe6f5
review
astubbs Oct 11, 2022
e977ebe
Merge remote-tracking branch 'origin/master' into fixes/state-truncat…
astubbs Oct 11, 2022
05e5ddb
changelog
astubbs Oct 11, 2022
747b132
review
astubbs Oct 11, 2022
a8e28c3
fix OffsetEncodingTests
astubbs Oct 11, 2022
6b93b1f
Draft PartitionStateCommittedOffsetIT, partially passing
astubbs Oct 11, 2022
e630134
compaction test works
astubbs Oct 11, 2022
1b73d18
run compaction on seperate broker
astubbs Oct 11, 2022
bd52b2c
compaction test review, fix higher / lower tests
astubbs Oct 11, 2022
97f3c43
offset removed test works for RESET == EARLIEST
astubbs Oct 11, 2022
9bcb14e
PartitionStateCommittedOffsetIT all green
astubbs Oct 12, 2022
a1cffd0
temp make method public for TG
astubbs Oct 12, 2022
f5b5ce2
fix scm tag to HEAD
astubbs Oct 12, 2022
80ab192
validate first
astubbs Oct 12, 2022
17d13eb
review
astubbs Oct 12, 2022
7be3ab0
Merge branch 'master' into fixes/state-truncation-reset-latest
astubbs Oct 12, 2022
a5ebf6a
rollback public -> protected. Add support for protected in TG later
astubbs Oct 12, 2022
b6a39cb
fix dangling clients - cannot reach broker errors
astubbs Oct 12, 2022
2e6deb0
docs
astubbs Oct 12, 2022
1d2902f
fixup! fix dangling clients - cannot reach broker errors
astubbs Oct 12, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.4

=== Fixes

* fixes #409: Adds support for compacted topics and commit offset resetting (#425)
** Truncate the offset state when bootstrap polled offset higher or lower than committed
** Prune missing records from the tracked incomplete offset state, when they're missing from polled batches

== 0.5.2.3

=== Improvements
Expand Down
8 changes: 8 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,14 @@ ifndef::github_name[]
toc::[]
endif::[]

== 0.5.2.4

=== Fixes

* fixes #409: Adds support for compacted topics and commit offset resetting (#425)
** Truncate the offset state when bootstrap polled offset higher or lower than committed
** Prune missing records from the tracked incomplete offset state, when they're missing from polled batches

== 0.5.2.3

=== Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@
import lombok.experimental.UtilityClass;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.time.Duration.ofMillis;

@UtilityClass
public class JavaUtils {

public static <T> Optional<T> getLast(final List<T> commitHistory) {
if (commitHistory.isEmpty()) return Optional.empty();
return Optional.of(commitHistory.get(commitHistory.size() - 1));
public static <T> Optional<T> getLast(final List<T> someList) {
if (someList.isEmpty()) return Optional.empty();
return Optional.of(someList.get(someList.size() - 1));
}

public static <T> Optional<T> getFirst(final List<T> someList) {
return someList.isEmpty() ? Optional.empty() : Optional.of(someList.get(0));
}

public static <T> Optional<T> getOnlyOne(final Map<String, T> stringMapMap) {
Expand Down Expand Up @@ -56,4 +58,19 @@ public static <K, V1, V2> Map<K, V2> remap(Map<K, V1> map,
));
}

public static List<String> getRandom(List<String> list, int quantity) {
if (list.size() < quantity) {
throw new IllegalArgumentException("List size is less than quantity");
}

return createRandomIntStream(list.size())
.limit(quantity)
.map(list::get)
.collect(Collectors.toList());
}

private static Stream<Integer> createRandomIntStream(int range) {
final Random random = new Random();
return Stream.generate(() -> random.nextInt(range));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.time.Duration;
import java.util.Objects;
Expand All @@ -31,6 +32,7 @@
@Builder(toBuilder = true)
@ToString
@FieldNameConstants
@InterfaceStability.Evolving
public class ParallelConsumerOptions<K, V> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,19 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp
protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOptions, PCModule<K, V> module) {
Objects.requireNonNull(newOptions, "Options must be supplied");

options = newOptions;
this.consumer = options.getConsumer();

validateConfiguration();

module.setParallelEoSStreamProcessor(this);

log.info("Confluent Parallel Consumer initialise... Options: {}", newOptions);
log.info("Confluent Parallel Consumer initialise... groupId: {}, Options: {}",
newOptions.getConsumer().groupMetadata().groupId(),
newOptions);

options = newOptions;
options.validate();

this.dynamicExtraLoadFactor = module.dynamicExtraLoadFactor();
this.consumer = options.getConsumer();

checkGroupIdConfigured(consumer);
checkNotSubscribed(consumer);
checkAutoCommitIsDisabled(consumer);

workerThreadPool = setupWorkerPool(newOptions.getMaxConcurrency());

Expand All @@ -276,6 +276,14 @@ protected AbstractParallelEoSStreamProcessor(ParallelConsumerOptions<K, V> newOp
}
}

private void validateConfiguration() {
options.validate();

checkGroupIdConfigured(consumer);
checkNotSubscribed(consumer);
checkAutoCommitIsDisabled(consumer);
}

private void checkGroupIdConfigured(final org.apache.kafka.clients.consumer.Consumer<K, V> consumer) {
try {
consumer.groupMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public EpochAndRecordsMap(ConsumerRecords<K, V> poll, PartitionStateManager<K, V
poll.partitions().forEach(partition -> {
var records = poll.records(partition);
Long epochOfPartition = pm.getEpochOfPartition(partition);
RecordsAndEpoch entry = new RecordsAndEpoch(epochOfPartition, records);
RecordsAndEpoch entry = new RecordsAndEpoch(partition, epochOfPartition, records);
recordMap.put(partition, entry);
});
}
Expand Down Expand Up @@ -63,6 +63,7 @@ public int count() {

@Value
public class RecordsAndEpoch {
@NonNull TopicPartition topicPartition;
@NonNull Long epochOfPartitionAtPoll;
@NonNull List<ConsumerRecord<K, V>> records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.state.PartitionState;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -27,12 +28,13 @@
* Sequential or not. Because as records are always in commit order, if we've seen a range of offsets, we know we've
* seen all that exist (within said range). So if offset 8 is missing from the partition, we will encode it as having
* been completed (when in fact it doesn't exist), because we only compare against known incompletes, and assume all
* others are complete.
* others are complete. See {@link PartitionState#incompleteOffsets} for more discussion on this.
* <p>
* So, when we deserialize, the INCOMPLETES collection is then restored, and that's what's used to compare to see if a
* record should be skipped or not. So if record 8 is recorded as completed, it will be absent from the restored
* INCOMPLETES list, and we are assured we will never see record 8.
*
* @see PartitionState#incompleteOffsets
* @see RunLengthEncoder
* @see OffsetBitSet
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
Expand All @@ -15,6 +16,11 @@
import static io.confluent.parallelconsumer.offsets.OffsetEncoding.Version.v1;
import static io.confluent.parallelconsumer.offsets.OffsetEncoding.Version.v2;

/**
* Offset encoding MagicNumbers to {@link OffsetEncoder}.
*
* @author Antony Stubbs
*/
@ToString
@RequiredArgsConstructor
public enum OffsetEncoding {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public static class HighestOffsetAndIncompletes {
/**
* Of the offsets encoded, the incomplete ones.
*/
// todo change to List as Sets have no order
astubbs marked this conversation as resolved.
Show resolved Hide resolved
Set<Long> incompleteOffsets;

public static HighestOffsetAndIncompletes of(long highestSeenOffset) {
Expand Down Expand Up @@ -164,7 +165,7 @@ public static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase
PartitionState<K, V> decodePartitionState(TopicPartition tp, OffsetAndMetadata offsetData) throws OffsetDecodingError {
HighestOffsetAndIncompletes incompletes = deserialiseIncompleteOffsetMapFromBase64(offsetData);
log.debug("Loaded incomplete offsets from offset payload {}", incompletes);
return new PartitionState<K, V>(tp, incompletes);
return new PartitionState<>(tp, incompletes);
}

public String makeOffsetMetadataPayload(long baseOffsetForPartition, PartitionState<K, V> state) throws NoEncodingPossibleException {
Expand Down
Loading