Skip to content

Commit

Permalink
fix: Improvements to encoding ranges (int vs long) (#439)
Browse files Browse the repository at this point in the history
Integers were still used in a lot of places when dealing with Offsets. This moves everything to use Long, and do exact casts and arithmetic when encoding into integers or shorts or bytes, for the encoders that do so.
  • Loading branch information
astubbs authored Nov 2, 2022
1 parent 94b28c1 commit 66b0158
Show file tree
Hide file tree
Showing 23 changed files with 419 additions and 168 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ endif::[]
* fixes #409: Adds support for compacted topics and commit offset resetting (#425)
** Truncate the offset state when bootstrap polled offset higher or lower than committed
** Prune missing records from the tracked incomplete offset state, when they're missing from polled batches
* fix: Improvements to encoding ranges (int vs long) #439
** Replace integer offset references with long - use Long everywhere we deal with offsets, and where we truncate down, do it exactly, detect and handle truncation issues.

== 0.5.2.3

Expand Down
3 changes: 2 additions & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,6 @@ See {issues_link}/12[issue #12], and the `ParallelConsumer` JavaDoc:

[source,java]
----
/**
* Asynchronous / concurrent message consumer for Kafka.
* <p>
Expand Down Expand Up @@ -1302,6 +1301,8 @@ endif::[]
* fixes #409: Adds support for compacted topics and commit offset resetting (#425)
** Truncate the offset state when bootstrap polled offset higher or lower than committed
** Prune missing records from the tracked incomplete offset state, when they're missing from polled batches
* fix: Improvements to encoding ranges (int vs long) #439
** Replace integer offset references with long - use Long everywhere we deal with offsets, and where we truncate down, do it exactly, detect and handle truncation issues.

== 0.5.2.3

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.confluent.csid.utils;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.experimental.UtilityClass;

/**
* @author Antony Stubbs
*/
@UtilityClass
public class MathUtils {

/**
* Ensures exact conversion from a Long to a Short.
* <p>
* {@link Math} doesn't have an exact conversion from Long to Short.
*
* @see Math#toIntExact
*/
public static short toShortExact(long value) {
final short shortCast = (short) value;
if (shortCast != value) {
throw new ArithmeticException("short overflow");
}
return shortCast;
}
}
Original file line number Diff line number Diff line change
@@ -1,48 +1,87 @@
package io.confluent.csid.utils;

/*-
* Copyright (C) 2020-2021 Confluent, Inc.
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static java.util.stream.Collectors.toList;

/**
* https://stackoverflow.com/a/16570509/105741
* Class for simple ranges.
* <p>
* <a href="https://stackoverflow.com/a/16570509/105741">For loop - like Python range function</a>
*
* @see #range(long)
*/
public class Range implements Iterable<Integer> {
public class Range implements Iterable<Long> {

private final long start;

private final long limit;

/**
* @see this#range(long)
*/
public Range(int start, long max) {
this.start = start;
this.limit = max;
}

public Range(long limit) {
this.start = 0L;
this.limit = limit;
}

/**
* Exclusive of max
* Provides an {@link Iterable} for the range of numbers from 0 to the given limit.
* <p>
* Exclusive of max.
* <p>
* Consider using {@link IntStream#range(int, int)#forEachOrdered} instead:
* <pre>
* IntStream.range(0, originalBitsetSize).forEachOrdered(offset -> {
* </pre>
* However, if you don't want o use a closure, this is a good alternative.
*/
public static Range range(long max) {
return new Range(max);
}

/**
* @see #range(long)
*/
public static Range range(int start, long max) {
return new Range(start, max);
}

/**
* Potentially slow, but useful for tests
*/
public static List<Integer> listOfIntegers(int max) {
return Range.range(max).listAsIntegers();
}


@Override
public Iterator<Integer> iterator() {
public Iterator<Long> iterator() {
final long max = limit;
return new Iterator<Integer>() {
return new Iterator<>() {

private int current = 0;
private long current = start;

@Override
public boolean hasNext() {
return current < max;
}

@Override
public Integer next() {
public Long next() {
if (hasNext()) {
return current++;
} else {
Expand All @@ -57,23 +96,14 @@ public void remove() {
};
}

public List<Integer> list() {
ArrayList<Integer> integers = new ArrayList<Integer>();
forEach(integers::add);
return integers;
}

public IntStream toStream() {
return IntStream.range(0, (int) limit);
}

static IntStream rangeStream(int i) {
return IntStream.range(0, i);
public List<Integer> listAsIntegers() {
return IntStream.range(Math.toIntExact(start), Math.toIntExact(limit))
.boxed()
.collect(toList());
}

static void range(int max, Consumer<Integer> consumer) {
IntStream.range(0, max)
.forEach(consumer::accept);
public LongStream toStream() {
return LongStream.range(start, limit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import io.confluent.csid.utils.MathUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.internal.InternalRuntimeException;
import io.confluent.parallelconsumer.state.PartitionState;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.nio.BufferOverflowException;
Expand All @@ -34,94 +36,92 @@
* record should be skipped or not. So if record 8 is recorded as completed, it will be absent from the restored
* INCOMPLETES list, and we are assured we will never see record 8.
*
* @author Antony Stubbs
* @see PartitionState#incompleteOffsets
* @see RunLengthEncoder
* @see OffsetBitSet
* @author Antony Stubbs
*/
@ToString(callSuper = true)
@Slf4j
public class BitSetEncoder extends OffsetEncoder {

private final Version version; // default to new version

private static final Version DEFAULT_VERSION = Version.v2;

/**
* {@link BitSet} only supports {@link Integer#MAX_VALUE) bits
*/
public static final Integer MAX_LENGTH_ENCODABLE = Integer.MAX_VALUE;

@Getter
private final BitSet bitSet;

private final int originalLength;
private final long originalLength;

private Optional<byte[]> encodedBytes = Optional.empty();

public BitSetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) throws BitSetEncodingNotSupportedException {
this(length, offsetSimultaneousEncoder, DEFAULT_VERSION);
}

/**
* @param length the difference between the highest and lowest offset to be encoded
*/
public BitSetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder, Version newVersion) throws BitSetEncodingNotSupportedException {
super(offsetSimultaneousEncoder);

this.version = newVersion;
public BitSetEncoder(long length, OffsetSimultaneousEncoder offsetSimultaneousEncoder, Version newVersion) throws BitSetEncodingNotSupportedException {
super(offsetSimultaneousEncoder, newVersion);

bitSet = new BitSet(length);
// prep bit set buffer, range check above
try {
bitSet = new BitSet(Math.toIntExact(length));
} catch (ArithmeticException e) {
throw new BitSetEncodingNotSupportedException("BitSet only supports " + MAX_LENGTH_ENCODABLE + " bits, but " + length + " were requested", e);
}

this.originalLength = length;
}

private ByteBuffer constructWrappedByteBuffer(final int length, final Version newVersion) throws BitSetEncodingNotSupportedException {
private ByteBuffer constructWrappedByteBuffer(long length, Version newVersion) throws BitSetEncodingNotSupportedException {
return switch (newVersion) {
case v1 -> initV1(length);
case v2 -> initV2(length);
};
}

/**
* Switch from encoding bitset length as a short to an integer (length of 32,000 was reasonable too short).
* Switch from encoding bitset length as a short to an integer (Short.MAX_VALUE size of 32,000 was too short).
* <p>
* Integer.MAX_VALUE should always be good enough as system restricts large from being processed at once.
* Integer.MAX_VALUE is the most we can use, as {@link BitSet} only supports {@link Integer#MAX_VALUE} bits.
*/
// TODO refactor inivtV2 and V1 together, passing in the Short or Integer
private ByteBuffer initV2(int bitsetEntriesRequired) throws BitSetEncodingNotSupportedException {
private ByteBuffer initV2(long bitsetEntriesRequired) throws BitSetEncodingNotSupportedException {
if (bitsetEntriesRequired > MAX_LENGTH_ENCODABLE) {
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
throw new BitSetEncodingNotSupportedException(StringUtils.msg("BitSet V2 too long to encode, as length overflows Integer.MAX_VALUE. Length: {}. (max: {})", bitsetEntriesRequired, MAX_LENGTH_ENCODABLE));
}

// prep bit set buffer
int bytesRequiredForEntries = (int) (Math.ceil((double) bitsetEntriesRequired / Byte.SIZE));
int lengthEntryWidth = Integer.BYTES;
int wrappedBufferLength = lengthEntryWidth + bytesRequiredForEntries + 1;
final ByteBuffer wrappedBitSetBytesBuffer = ByteBuffer.allocate(wrappedBufferLength);

// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
wrappedBitSetBytesBuffer.putInt(bitsetEntriesRequired);
wrappedBitSetBytesBuffer.putInt(Math.toIntExact(bitsetEntriesRequired));

return wrappedBitSetBytesBuffer;
}

/**
* This was a bit "short" sighted of me....
*
* @return
* This was a bit "short" sighted of me.... Encodes the capacity of the bitset as a short, which is only ~32,000
* bits ({@link Short#MAX_VALUE}).
*/
private ByteBuffer initV1(int bitsetEntriesRequired) throws BitSetEncodingNotSupportedException {
private ByteBuffer initV1(long bitsetEntriesRequired) throws BitSetEncodingNotSupportedException {
if (bitsetEntriesRequired > Short.MAX_VALUE) {
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
throw new BitSetEncodingNotSupportedException("Input too long to encode for BitSet V1, length overflows Short.MAX_VALUE: " + bitsetEntriesRequired + ". (max: " + Short.MAX_VALUE + ")");
}

// prep bit set buffer
int bytesRequiredForEntries = (int) (Math.ceil((double) bitsetEntriesRequired / Byte.SIZE));
int lengthEntryWidth = Short.BYTES;
int wrappedBufferLength = lengthEntryWidth + bytesRequiredForEntries + 1;
final ByteBuffer wrappedBitSetBytesBuffer = ByteBuffer.allocate(wrappedBufferLength);

// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
wrappedBitSetBytesBuffer.putShort((short) bitsetEntriesRequired);
wrappedBitSetBytesBuffer.putShort(MathUtils.toShortExact(bitsetEntriesRequired));

return wrappedBitSetBytesBuffer;
}
Expand All @@ -143,13 +143,14 @@ protected OffsetEncoding getEncodingTypeCompressed() {
}

@Override
public void encodeIncompleteOffset(final int index) {
public void encodeIncompleteOffset(final long relativeOffset) {
// noop - bitset defaults to 0's (`unset`)
}

@Override
public void encodeCompletedOffset(final int index) {
bitSet.set(index);
public void encodeCompletedOffset(final long relativeOffset) {
// range will already have been checked at initialization
bitSet.set(Math.toIntExact(relativeOffset));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* Copyright (C) 2020-2022 Confluent, Inc.
*/

import lombok.ToString;

import java.nio.ByteBuffer;

import static io.confluent.parallelconsumer.offsets.OffsetEncoding.ByteArray;
Expand All @@ -15,13 +17,17 @@
*
* @author Antony Stubbs
*/
@ToString(callSuper = true)
public class ByteBufferEncoder extends OffsetEncoder {

private final ByteBuffer bytesBuffer;

public ByteBufferEncoder(final int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
super(offsetSimultaneousEncoder);
this.bytesBuffer = ByteBuffer.allocate(1 + length);
public ByteBufferEncoder(long length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
super(offsetSimultaneousEncoder, OffsetEncoding.Version.v1);

// safe cast the length to an int, as we're not expecting to have more than 2^31 offsets
final int safeCast = Math.toIntExact(length);
this.bytesBuffer = ByteBuffer.allocate(1 + safeCast);
}

@Override
Expand All @@ -35,12 +41,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
}

@Override
public void encodeIncompleteOffset(final int rangeIndex) {
public void encodeIncompleteOffset(final long relativeOffset) {
this.bytesBuffer.put((byte) 0);
}

@Override
public void encodeCompletedOffset(final int rangeIndex) {
public void encodeCompletedOffset(final long relativeOffset) {
this.bytesBuffer.put((byte) 1);
}

Expand Down
Loading

0 comments on commit 66b0158

Please sign in to comment.