diff --git a/docs/changelog/121552.yaml b/docs/changelog/121552.yaml new file mode 100644 index 0000000000000..c12e7615d1245 --- /dev/null +++ b/docs/changelog/121552.yaml @@ -0,0 +1,5 @@ +pr: 121552 +summary: Fix a bug in TOP +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java index 63318a2189908..ca89e6f999641 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/DoubleBucketedSort.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -101,7 +102,7 @@ public void collect(double value, int bucket) { // Gathering mode long requiredSize = rootIndex + bucketSize; if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); assert 0 <= next && next < bucketSize @@ -257,19 +258,25 @@ private void swap(long lhs, long rhs) { /** * Allocate storage for more buckets and store the "next gather offset" - * for those new buckets. + * for those new buckets. We always grow the storage by whole bucket's + * worth of slots at a time. We never allocate space for partial buckets. */ - private void grow(long minSize) { + private void grow(int bucket) { long oldMax = values.size(); - values = bigArrays.grow(values, minSize); + assert oldMax % bucketSize == 0; + + long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.DOUBLE_PAGE_SIZE, Double.BYTES); + // Round up to the next full bucket. + newSize = (newSize + bucketSize - 1) / bucketSize; + values = bigArrays.resize(values, newSize * bucketSize); // Set the next gather offsets for all newly allocated buckets. - setNextGatherOffsets(oldMax - (oldMax % getBucketSize())); + fillGatherOffsets(oldMax); } /** * Maintain the "next gather offsets" for newly allocated buckets. */ - private void setNextGatherOffsets(long startingAt) { + private void fillGatherOffsets(long startingAt) { int nextOffset = getBucketSize() - 1; for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) { setNextGatherOffset(bucketRoot, nextOffset); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java index b490fe193c33f..2bf8edd99f48c 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/FloatBucketedSort.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.FloatArray; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -101,7 +102,7 @@ public void collect(float value, int bucket) { // Gathering mode long requiredSize = rootIndex + bucketSize; if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); assert 0 <= next && next < bucketSize @@ -257,19 +258,25 @@ private void swap(long lhs, long rhs) { /** * Allocate storage for more buckets and store the "next gather offset" - * for those new buckets. + * for those new buckets. We always grow the storage by whole bucket's + * worth of slots at a time. We never allocate space for partial buckets. */ - private void grow(long minSize) { + private void grow(int bucket) { long oldMax = values.size(); - values = bigArrays.grow(values, minSize); + assert oldMax % bucketSize == 0; + + long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.FLOAT_PAGE_SIZE, Float.BYTES); + // Round up to the next full bucket. + newSize = (newSize + bucketSize - 1) / bucketSize; + values = bigArrays.resize(values, newSize * bucketSize); // Set the next gather offsets for all newly allocated buckets. - setNextGatherOffsets(oldMax - (oldMax % getBucketSize())); + fillGatherOffsets(oldMax); } /** * Maintain the "next gather offsets" for newly allocated buckets. */ - private void setNextGatherOffsets(long startingAt) { + private void fillGatherOffsets(long startingAt) { int nextOffset = getBucketSize() - 1; for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) { setNextGatherOffset(bucketRoot, nextOffset); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java index 04a635d75fe52..257dfe2ebb0bd 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/IntBucketedSort.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -101,7 +102,7 @@ public void collect(int value, int bucket) { // Gathering mode long requiredSize = rootIndex + bucketSize; if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); assert 0 <= next && next < bucketSize @@ -257,19 +258,25 @@ private void swap(long lhs, long rhs) { /** * Allocate storage for more buckets and store the "next gather offset" - * for those new buckets. + * for those new buckets. We always grow the storage by whole bucket's + * worth of slots at a time. We never allocate space for partial buckets. */ - private void grow(long minSize) { + private void grow(int bucket) { long oldMax = values.size(); - values = bigArrays.grow(values, minSize); + assert oldMax % bucketSize == 0; + + long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.INT_PAGE_SIZE, Integer.BYTES); + // Round up to the next full bucket. + newSize = (newSize + bucketSize - 1) / bucketSize; + values = bigArrays.resize(values, newSize * bucketSize); // Set the next gather offsets for all newly allocated buckets. - setNextGatherOffsets(oldMax - (oldMax % getBucketSize())); + fillGatherOffsets(oldMax); } /** * Maintain the "next gather offsets" for newly allocated buckets. */ - private void setNextGatherOffsets(long startingAt) { + private void fillGatherOffsets(long startingAt) { int nextOffset = getBucketSize() - 1; for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) { setNextGatherOffset(bucketRoot, nextOffset); diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java index e08c25256944b..c27467ebb60ff 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/sort/LongBucketedSort.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -101,7 +102,7 @@ public void collect(long value, int bucket) { // Gathering mode long requiredSize = rootIndex + bucketSize; if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); assert 0 <= next && next < bucketSize @@ -257,19 +258,25 @@ private void swap(long lhs, long rhs) { /** * Allocate storage for more buckets and store the "next gather offset" - * for those new buckets. + * for those new buckets. We always grow the storage by whole bucket's + * worth of slots at a time. We never allocate space for partial buckets. */ - private void grow(long minSize) { + private void grow(int bucket) { long oldMax = values.size(); - values = bigArrays.grow(values, minSize); + assert oldMax % bucketSize == 0; + + long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.LONG_PAGE_SIZE, Long.BYTES); + // Round up to the next full bucket. + newSize = (newSize + bucketSize - 1) / bucketSize; + values = bigArrays.resize(values, newSize * bucketSize); // Set the next gather offsets for all newly allocated buckets. - setNextGatherOffsets(oldMax - (oldMax % getBucketSize())); + fillGatherOffsets(oldMax); } /** * Maintain the "next gather offsets" for newly allocated buckets. */ - private void setNextGatherOffsets(long startingAt) { + private void fillGatherOffsets(long startingAt) { int nextOffset = getBucketSize() - 1; for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) { setNextGatherOffset(bucketRoot, nextOffset); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java index 6dca94b9bc79a..63d79a9198622 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/BytesRefBucketedSort.java @@ -8,10 +8,12 @@ package org.elasticsearch.compute.data.sort; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteUtils; import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -29,6 +31,11 @@ /** * Aggregates the top N variable length {@link BytesRef} values per bucket. * See {@link BucketedSort} for more information. + *
+ * This is substantially different from {@link IpBucketedSort} because + * this has to handle variable length byte strings. To do that it allocates + * a heap of {@link BreakingBytesRefBuilder}s. + *
*/ public class BytesRefBucketedSort implements Releasable { private final BucketedSortCommon common; @@ -123,7 +130,7 @@ public void collect(BytesRef value, int bucket) { // Gathering mode long requiredSize = common.endIndex(rootIndex); if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); common.assertValidNextOffset(next); @@ -271,13 +278,23 @@ private void swap(long lhs, long rhs) { /** * Allocate storage for more buckets and store the "next gather offset" - * for those new buckets. + * for those new buckets. We always grow the storage by whole bucket's + * worth of slots at a time. We never allocate space for partial buckets. */ - private void grow(long requiredSize) { + private void grow(int bucket) { long oldMax = values.size(); - values = common.bigArrays.grow(values, requiredSize); + assert oldMax % common.bucketSize == 0; + + long newSize = BigArrays.overSize( + ((long) bucket + 1) * common.bucketSize, + PageCacheRecycler.OBJECT_PAGE_SIZE, + RamUsageEstimator.NUM_BYTES_OBJECT_REF + ); + // Round up to the next full bucket. + newSize = (newSize + common.bucketSize - 1) / common.bucketSize; + values = common.bigArrays.resize(values, newSize * common.bucketSize); // Set the next gather offsets for all newly allocated buckets. - fillGatherOffsets(oldMax - (oldMax % common.bucketSize)); + fillGatherOffsets(oldMax); } /** @@ -296,6 +313,7 @@ private void fillGatherOffsets(long startingAt) { bytes.grow(Integer.BYTES); bytes.setLength(Integer.BYTES); ByteUtils.writeIntLE(nextOffset, bytes.bytes(), 0); + checkInvariant(Math.toIntExact(bucketRoot / common.bucketSize)); } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java index 4eb31ea30db22..4392d3994886c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/IpBucketedSort.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; import org.elasticsearch.common.util.ByteUtils; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -26,6 +27,11 @@ /** * Aggregates the top N IP values per bucket. * See {@link BucketedSort} for more information. + *+ * This is substantially different from {@link BytesRefBucketedSort} because + * this takes advantage of IPs having a fixed length and allocates a dense + * storage for them. + *
*/ public class IpBucketedSort implements Releasable { private static final int IP_LENGTH = 16; // Bytes. It's ipv6. @@ -101,7 +107,7 @@ public void collect(BytesRef value, int bucket) { // Gathering mode long requiredSize = common.endIndex(rootIndex) * IP_LENGTH; if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); common.assertValidNextOffset(next); @@ -268,17 +274,23 @@ private void swap(long lhs, long rhs) { * Allocate storage for more buckets and store the "next gather offset" * for those new buckets. */ - private void grow(long minSize) { + private void grow(int bucket) { long oldMax = values.size() / IP_LENGTH; - values = common.bigArrays.grow(values, minSize); + assert oldMax % common.bucketSize == 0; + + int bucketBytes = common.bucketSize * IP_LENGTH; + long newSize = BigArrays.overSize(((long) bucket + 1) * bucketBytes, PageCacheRecycler.BYTE_PAGE_SIZE, 1); + // Round up to the next full bucket. + newSize = (newSize + bucketBytes - 1) / bucketBytes; + values = common.bigArrays.resize(values, newSize * bucketBytes); // Set the next gather offsets for all newly allocated buckets. - setNextGatherOffsets(oldMax - (oldMax % common.bucketSize)); + fillGatherOffsets(oldMax); } /** * Maintain the "next gather offsets" for newly allocated buckets. */ - private void setNextGatherOffsets(long startingAt) { + private void fillGatherOffsets(long startingAt) { int nextOffset = common.bucketSize - 1; for (long bucketRoot = startingAt; bucketRoot < values.size() / IP_LENGTH; bucketRoot += common.bucketSize) { setNextGatherOffset(bucketRoot, nextOffset); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st index 6587743e34b6f..095d48021e9c1 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/sort/X-BucketedSort.java.st @@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.$Type$Array; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.IntVector; @@ -101,7 +102,7 @@ public class $Type$BucketedSort implements Releasable { // Gathering mode long requiredSize = rootIndex + bucketSize; if (values.size() < requiredSize) { - grow(requiredSize); + grow(bucket); } int next = getNextGatherOffset(rootIndex); assert 0 <= next && next < bucketSize @@ -261,19 +262,25 @@ $endif$ /** * Allocate storage for more buckets and store the "next gather offset" - * for those new buckets. + * for those new buckets. We always grow the storage by whole bucket's + * worth of slots at a time. We never allocate space for partial buckets. */ - private void grow(long minSize) { + private void grow(int bucket) { long oldMax = values.size(); - values = bigArrays.grow(values, minSize); + assert oldMax % bucketSize == 0; + + long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.$TYPE$_PAGE_SIZE, $BYTES$); + // Round up to the next full bucket. + newSize = (newSize + bucketSize - 1) / bucketSize; + values = bigArrays.resize(values, newSize * bucketSize); // Set the next gather offsets for all newly allocated buckets. - setNextGatherOffsets(oldMax - (oldMax % getBucketSize())); + fillGatherOffsets(oldMax); } /** * Maintain the "next gather offsets" for newly allocated buckets. */ - private void setNextGatherOffsets(long startingAt) { + private void fillGatherOffsets(long startingAt) { int nextOffset = getBucketSize() - 1; for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) { setNextGatherOffset(bucketRoot, nextOffset); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/sort/BucketedSortTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/sort/BucketedSortTestCase.java index 78ed096c10b3f..2358643dc089e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/sort/BucketedSortTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/sort/BucketedSortTestCase.java @@ -409,6 +409,42 @@ public final void testMergeThisBigger() { } } + public final void testMergePastEnd() { + int buckets = 10000; + int bucketSize = between(1, 1000); + int target = between(0, buckets); + List