From c6c5f73636d457d7341377933462dcf3b6554dce Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Tue, 1 Nov 2022 17:24:43 +0000 Subject: [PATCH] START: SortedSet's all the way down... --- .../parallelconsumer/offsets/OffsetBitSet.java | 12 ++++++------ .../offsets/OffsetMapCodecManager.java | 8 ++++---- .../parallelconsumer/offsets/OffsetRunLength.java | 5 +++-- .../parallelconsumer/state/PartitionState.java | 3 +-- .../state/PartitionStateCommittedOffsetTest.java | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java index fafc735bb..94b625d1f 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java @@ -3,14 +3,15 @@ /*- * Copyright (C) 2020-2022 Confluent, Inc. */ + import io.confluent.parallelconsumer.internal.InternalRuntimeException; import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes; import lombok.extern.slf4j.Slf4j; import java.nio.ByteBuffer; import java.util.BitSet; -import java.util.HashSet; -import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import static io.confluent.csid.utils.Range.range; @@ -61,15 +62,14 @@ static HighestOffsetAndIncompletes deserialiseBitSetWrapToIncompletes(OffsetEnco default -> throw new InternalRuntimeException("Invalid state"); }; ByteBuffer slice = wrap.slice(); - Set incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice); + SortedSet incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice); long highestSeenOffset = baseOffset + originalBitsetSize - 1; return HighestOffsetAndIncompletes.of(highestSeenOffset, incompletes); } - static Set deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) { + static SortedSet deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) { BitSet bitSet = BitSet.valueOf(inputBuffer); - int numberOfIncompletes = originalBitsetSize - bitSet.cardinality(); - var incompletes = new HashSet(numberOfIncompletes); + var incompletes = new TreeSet(); for (long relativeOffsetLong : range(originalBitsetSize)) { // range will already have been checked at initialization var relativeOffset = Math.toIntExact(relativeOffsetLong); diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java index cd4010775..3455fae46 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetMapCodecManager.java @@ -77,18 +77,18 @@ public static class HighestOffsetAndIncompletes { * Of the offsets encoded, the incomplete ones. */ // todo change to List as Sets have no order - Set incompleteOffsets; + SortedSet incompleteOffsets; public static HighestOffsetAndIncompletes of(long highestSeenOffset) { - return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new HashSet<>()); + return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new TreeSet<>()); } - public static HighestOffsetAndIncompletes of(long highestSeenOffset, Set incompleteOffsets) { + public static HighestOffsetAndIncompletes of(long highestSeenOffset, SortedSet incompleteOffsets) { return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), incompleteOffsets); } public static HighestOffsetAndIncompletes of() { - return new HighestOffsetAndIncompletes(Optional.empty(), new HashSet<>()); + return new HighestOffsetAndIncompletes(Optional.empty(), new TreeSet<>()); } } diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java index e0784c16f..71199b727 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java @@ -3,6 +3,7 @@ /*- * Copyright (C) 2020-2022 Confluent, Inc. */ + import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -12,8 +13,8 @@ import java.nio.IntBuffer; import java.nio.ShortBuffer; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -80,7 +81,7 @@ static HighestOffsetAndIncompletes runLengthDecodeToIncompletes(OffsetEncoding e final ShortBuffer v1ShortBuffer = in.asShortBuffer(); final IntBuffer v2IntegerBuffer = in.asIntBuffer(); - final var incompletes = new HashSet(); // we don't know the capacity yet + final var incompletes = new TreeSet(); long highestSeenOffset = 0L; diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index 9a9e92df6..283c762be 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -17,7 +17,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import pl.tlinkowski.unij.api.UniSets; import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; @@ -346,7 +345,7 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) { // reset var resetHighestSeenOffset = Optional.empty(); - var resetIncompletesMap = UniSets.of(); + var resetIncompletesMap = new TreeSet(); var offsetData = new OffsetMapCodecManager.HighestOffsetAndIncompletes(resetHighestSeenOffset, resetIncompletesMap); initStateFromOffsetData(offsetData); } diff --git a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java index c7096e6c3..cc1378dd5 100644 --- a/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java +++ b/parallel-consumer-core/src/test/java/io/confluent/parallelconsumer/state/PartitionStateCommittedOffsetTest.java @@ -15,10 +15,10 @@ import pl.tlinkowski.unij.api.UniLists; import pl.tlinkowski.unij.api.UniSets; -import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.TreeSet; import java.util.stream.Collectors; import static io.confluent.parallelconsumer.ManagedTruth.assertThat; @@ -51,7 +51,7 @@ class PartitionStateCommittedOffsetTest { .filter(offset -> offset >= unexpectedlyHighOffset) .collect(Collectors.toList()); - HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new HashSet<>(incompletes)); + HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new TreeSet<>(incompletes)); PartitionState state = new PartitionState<>(0, mu.getModule(), tp, offsetData);