Skip to content

Commit

Permalink
START: SortedSet's all the way down...
Browse files Browse the repository at this point in the history
  • Loading branch information
astubbs committed Nov 2, 2022
1 parent 66b0158 commit c6c5f73
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes;
import lombok.extern.slf4j.Slf4j;

import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.HashSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import static io.confluent.csid.utils.Range.range;

Expand Down Expand Up @@ -61,15 +62,14 @@ static HighestOffsetAndIncompletes deserialiseBitSetWrapToIncompletes(OffsetEnco
default -> throw new InternalRuntimeException("Invalid state");
};
ByteBuffer slice = wrap.slice();
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
SortedSet<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
long highestSeenOffset = baseOffset + originalBitsetSize - 1;
return HighestOffsetAndIncompletes.of(highestSeenOffset, incompletes);
}

static Set<Long> deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) {
static SortedSet<Long> deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) {
BitSet bitSet = BitSet.valueOf(inputBuffer);
int numberOfIncompletes = originalBitsetSize - bitSet.cardinality();
var incompletes = new HashSet<Long>(numberOfIncompletes);
var incompletes = new TreeSet<Long>();
for (long relativeOffsetLong : range(originalBitsetSize)) {
// range will already have been checked at initialization
var relativeOffset = Math.toIntExact(relativeOffsetLong);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ public static class HighestOffsetAndIncompletes {
* Of the offsets encoded, the incomplete ones.
*/
// todo change to List as Sets have no order
Set<Long> incompleteOffsets;
SortedSet<Long> incompleteOffsets;

public static HighestOffsetAndIncompletes of(long highestSeenOffset) {
return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new HashSet<>());
return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new TreeSet<>());
}

public static HighestOffsetAndIncompletes of(long highestSeenOffset, Set<Long> incompleteOffsets) {
public static HighestOffsetAndIncompletes of(long highestSeenOffset, SortedSet<Long> incompleteOffsets) {
return new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), incompleteOffsets);
}

public static HighestOffsetAndIncompletes of() {
return new HighestOffsetAndIncompletes(Optional.empty(), new HashSet<>());
return new HighestOffsetAndIncompletes(Optional.empty(), new TreeSet<>());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.parallelconsumer.offsets.OffsetMapCodecManager.HighestOffsetAndIncompletes;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -12,8 +13,8 @@
import java.nio.IntBuffer;
import java.nio.ShortBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
Expand Down Expand Up @@ -80,7 +81,7 @@ static HighestOffsetAndIncompletes runLengthDecodeToIncompletes(OffsetEncoding e
final ShortBuffer v1ShortBuffer = in.asShortBuffer();
final IntBuffer v2IntegerBuffer = in.asIntBuffer();

final var incompletes = new HashSet<Long>(); // we don't know the capacity yet
final var incompletes = new TreeSet<Long>();

long highestSeenOffset = 0L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import pl.tlinkowski.unij.api.UniSets;

import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -346,7 +345,7 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {

// reset
var resetHighestSeenOffset = Optional.<Long>empty();
var resetIncompletesMap = UniSets.<Long>of();
var resetIncompletesMap = new TreeSet<Long>();
var offsetData = new OffsetMapCodecManager.HighestOffsetAndIncompletes(resetHighestSeenOffset, resetIncompletesMap);
initStateFromOffsetData(offsetData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;

import static io.confluent.parallelconsumer.ManagedTruth.assertThat;
Expand Down Expand Up @@ -51,7 +51,7 @@ class PartitionStateCommittedOffsetTest {
.filter(offset -> offset >= unexpectedlyHighOffset)
.collect(Collectors.toList());

HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new HashSet<>(incompletes));
HighestOffsetAndIncompletes offsetData = new HighestOffsetAndIncompletes(Optional.of(highestSeenOffset), new TreeSet<>(incompletes));

PartitionState<String, String> state = new PartitionState<>(0, mu.getModule(), tp, offsetData);

Expand Down

0 comments on commit c6c5f73

Please sign in to comment.