Skip to content

Commit

Permalink
ESQL: Fix a bug in TOP (elastic#121552)
Browse files Browse the repository at this point in the history
Fix a bug in TOP which surfaces when merging results from ordinals. We
weren't always accounting for oversized arrays when checking if we'd
ever seen a field. This changes the oversize itself to always size on a bucket boundary.

The test for this required random `bucketSize` - without that the
oversizing frequently wouldn't cause trouble.
  • Loading branch information
nik9000 committed Feb 5, 2025
1 parent f7bd10b commit fa967d5
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 40 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/121552.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121552
summary: Fix a bug in TOP
area: ES|QL
type: bug
issues: []

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
package org.elasticsearch.compute.data.sort;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand All @@ -29,6 +31,11 @@
/**
* Aggregates the top N variable length {@link BytesRef} values per bucket.
* See {@link BucketedSort} for more information.
* <p>
* This is substantially different from {@link IpBucketedSort} because
* this has to handle variable length byte strings. To do that it allocates
* a heap of {@link BreakingBytesRefBuilder}s.
* </p>
*/
public class BytesRefBucketedSort implements Releasable {
private final BucketedSortCommon common;
Expand Down Expand Up @@ -123,7 +130,7 @@ public void collect(BytesRef value, int bucket) {
// Gathering mode
long requiredSize = common.endIndex(rootIndex);
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
common.assertValidNextOffset(next);
Expand Down Expand Up @@ -271,13 +278,23 @@ private void swap(long lhs, long rhs) {

/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
* for those new buckets. We always grow the storage by whole bucket's
* worth of slots at a time. We never allocate space for partial buckets.
*/
private void grow(long requiredSize) {
private void grow(int bucket) {
long oldMax = values.size();
values = common.bigArrays.grow(values, requiredSize);
assert oldMax % common.bucketSize == 0;

long newSize = BigArrays.overSize(
((long) bucket + 1) * common.bucketSize,
PageCacheRecycler.OBJECT_PAGE_SIZE,
RamUsageEstimator.NUM_BYTES_OBJECT_REF
);
// Round up to the next full bucket.
newSize = (newSize + common.bucketSize - 1) / common.bucketSize;
values = common.bigArrays.resize(values, newSize * common.bucketSize);
// Set the next gather offsets for all newly allocated buckets.
fillGatherOffsets(oldMax - (oldMax % common.bucketSize));
fillGatherOffsets(oldMax);
}

/**
Expand All @@ -296,6 +313,7 @@ private void fillGatherOffsets(long startingAt) {
bytes.grow(Integer.BYTES);
bytes.setLength(Integer.BYTES);
ByteUtils.writeIntLE(nextOffset, bytes.bytes(), 0);
checkInvariant(Math.toIntExact(bucketRoot / common.bucketSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.ByteUtils;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand All @@ -26,6 +27,11 @@
/**
* Aggregates the top N IP values per bucket.
* See {@link BucketedSort} for more information.
* <p>
* This is substantially different from {@link BytesRefBucketedSort} because
* this takes advantage of IPs having a fixed length and allocates a dense
* storage for them.
* </p>
*/
public class IpBucketedSort implements Releasable {
private static final int IP_LENGTH = 16; // Bytes. It's ipv6.
Expand Down Expand Up @@ -101,7 +107,7 @@ public void collect(BytesRef value, int bucket) {
// Gathering mode
long requiredSize = common.endIndex(rootIndex) * IP_LENGTH;
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
common.assertValidNextOffset(next);
Expand Down Expand Up @@ -268,17 +274,23 @@ private void swap(long lhs, long rhs) {
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
*/
private void grow(long minSize) {
private void grow(int bucket) {
long oldMax = values.size() / IP_LENGTH;
values = common.bigArrays.grow(values, minSize);
assert oldMax % common.bucketSize == 0;

int bucketBytes = common.bucketSize * IP_LENGTH;
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketBytes, PageCacheRecycler.BYTE_PAGE_SIZE, 1);
// Round up to the next full bucket.
newSize = (newSize + bucketBytes - 1) / bucketBytes;
values = common.bigArrays.resize(values, newSize * bucketBytes);
// Set the next gather offsets for all newly allocated buckets.
setNextGatherOffsets(oldMax - (oldMax % common.bucketSize));
fillGatherOffsets(oldMax);
}

/**
* Maintain the "next gather offsets" for newly allocated buckets.
*/
private void setNextGatherOffsets(long startingAt) {
private void fillGatherOffsets(long startingAt) {
int nextOffset = common.bucketSize - 1;
for (long bucketRoot = startingAt; bucketRoot < values.size() / IP_LENGTH; bucketRoot += common.bucketSize) {
setNextGatherOffset(bucketRoot, nextOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.$Type$Array;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.IntVector;
Expand Down Expand Up @@ -101,7 +102,7 @@ public class $Type$BucketedSort implements Releasable {
// Gathering mode
long requiredSize = rootIndex + bucketSize;
if (values.size() < requiredSize) {
grow(requiredSize);
grow(bucket);
}
int next = getNextGatherOffset(rootIndex);
assert 0 <= next && next < bucketSize
Expand Down Expand Up @@ -261,19 +262,25 @@ $endif$

/**
* Allocate storage for more buckets and store the "next gather offset"
* for those new buckets.
* for those new buckets. We always grow the storage by whole bucket's
* worth of slots at a time. We never allocate space for partial buckets.
*/
private void grow(long minSize) {
private void grow(int bucket) {
long oldMax = values.size();
values = bigArrays.grow(values, minSize);
assert oldMax % bucketSize == 0;

long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.$TYPE$_PAGE_SIZE, $BYTES$);
// Round up to the next full bucket.
newSize = (newSize + bucketSize - 1) / bucketSize;
values = bigArrays.resize(values, newSize * bucketSize);
// Set the next gather offsets for all newly allocated buckets.
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
fillGatherOffsets(oldMax);
}

/**
* Maintain the "next gather offsets" for newly allocated buckets.
*/
private void setNextGatherOffsets(long startingAt) {
private void fillGatherOffsets(long startingAt) {
int nextOffset = getBucketSize() - 1;
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
setNextGatherOffset(bucketRoot, nextOffset);
Expand Down
Loading

0 comments on commit fa967d5

Please sign in to comment.