diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java index 652defa7b39cd..056f7810677a2 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java @@ -60,6 +60,9 @@ import java.util.stream.LongStream; import java.util.stream.Stream; +/** + * Benchmark for many different kinds of aggregator and groupings. + */ @Warmup(iterations = 5) @Measurement(iterations = 7) @BenchmarkMode(Mode.AverageTime) diff --git a/docs/changelog/123073.yaml b/docs/changelog/123073.yaml new file mode 100644 index 0000000000000..95c6a4cbfd6b9 --- /dev/null +++ b/docs/changelog/123073.yaml @@ -0,0 +1,5 @@ +pr: 123073 +summary: Speed up VALUES for many buckets +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java index 602fd29433193..682a1e51ff3fc 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.aggregation; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefHash; import org.elasticsearch.common.util.LongLongHash; @@ -138,47 +139,128 @@ void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContex blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - BytesRef scratch = new BytesRef(); - try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - long first = 0; - for (int id = 0; id < values.size(); id++) { - if (values.getKey1(id) == selectedGroup) { - long value = values.getKey2(id); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendBytesRef(bytes.get(first, scratch)); - builder.appendBytesRef(bytes.get(value, scratch)); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + BytesRef scratch = new BytesRef(); + try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start], scratch); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i], scratch); } - default -> builder.appendBytesRef(bytes.get(value, scratch)); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendBytesRef(bytes.get(first, scratch)); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } - void enableGroupIdTracking(SeenGroupIds seen) { + private void append(BytesRefBlock.Builder builder, int id, BytesRef scratch) { + BytesRef value = bytes.get(values.getKey2(id), scratch); + builder.appendBytesRef(value); + } + + public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java index a8409367bc090..505d3a91991ec 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.LongLongHash; @@ -129,46 +130,127 @@ void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContex blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - double first = 0; - for (int id = 0; id < values.size(); id++) { - if (values.getKey1(id) == selectedGroup) { - double value = Double.longBitsToDouble(values.getKey2(id)); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendDouble(first); - builder.appendDouble(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendDouble(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendDouble(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } - void enableGroupIdTracking(SeenGroupIds seen) { + private void append(DoubleBlock.Builder builder, int id) { + double value = Double.longBitsToDouble(values.getKey2(id)); + builder.appendDouble(value); + } + + public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java index f9e5e1b7b283a..9c50552110183 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.ann.Aggregator; @@ -134,48 +135,130 @@ void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContex blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (FloatBlock.Builder builder = blockFactory.newFloatBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - float first = 0; - for (int id = 0; id < values.size(); id++) { - long both = values.get(id); - int group = (int) (both >>> Float.SIZE); - if (group == selectedGroup) { - float value = Float.intBitsToFloat((int) both); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendFloat(first); - builder.appendFloat(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (FloatBlock.Builder builder = blockFactory.newFloatBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendFloat(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendFloat(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } - void enableGroupIdTracking(SeenGroupIds seen) { + private void append(FloatBlock.Builder builder, int id) { + long both = values.get(id); + float value = Float.intBitsToFloat((int) both); + builder.appendFloat(value); + } + + public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java index 2420dcee70712..1e0ca72b8d1a6 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.ann.Aggregator; @@ -134,48 +135,130 @@ void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContex blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - int first = 0; - for (int id = 0; id < values.size(); id++) { - long both = values.get(id); - int group = (int) (both >>> Integer.SIZE); - if (group == selectedGroup) { - int value = (int) both; - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendInt(first); - builder.appendInt(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendInt(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendInt(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } - void enableGroupIdTracking(SeenGroupIds seen) { + private void append(IntBlock.Builder builder, int id) { + long both = values.get(id); + int value = (int) both; + builder.appendInt(value); + } + + public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block } diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java index 4938b8f15edb0..ba04f928b9fb9 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.LongLongHash; @@ -129,46 +130,127 @@ void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContex blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - long first = 0; - for (int id = 0; id < values.size(); id++) { - if (values.getKey1(id) == selectedGroup) { - long value = values.getKey2(id); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendLong(first); - builder.appendLong(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendLong(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendLong(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } - void enableGroupIdTracking(SeenGroupIds seen) { + private void append(LongBlock.Builder builder, int id) { + long value = values.getKey2(id); + builder.appendLong(value); + } + + public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st index a8884c58116f3..69243332449f6 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st @@ -10,6 +10,7 @@ package org.elasticsearch.compute.aggregation; $if(BytesRef)$ import org.apache.lucene.util.BytesRef; $endif$ +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; $if(BytesRef)$ import org.elasticsearch.common.util.BytesRefHash; @@ -255,64 +256,158 @@ $endif$ blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } -$if(BytesRef)$ - BytesRef scratch = new BytesRef(); + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { +$if(long||BytesRef||double)$ + int group = (int) values.getKey1(id); +$elseif(float||int)$ + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); $endif$ - try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) { + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - $if(BytesRef)$long$else$$type$$endif$ first = 0; - for (int id = 0; id < values.size(); id++) { -$if(long||BytesRef)$ - if (values.getKey1(id) == selectedGroup) { - long value = values.getKey2(id); -$elseif(double)$ - if (values.getKey1(id) == selectedGroup) { - double value = Double.longBitsToDouble(values.getKey2(id)); -$elseif(float)$ - long both = values.get(id); - int group = (int) (both >>> Float.SIZE); - if (group == selectedGroup) { - float value = Float.intBitsToFloat((int) both); -$elseif(int)$ - long both = values.get(id); - int group = (int) (both >>> Integer.SIZE); - if (group == selectedGroup) { - int value = (int) both; + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { +$if(long||BytesRef||double)$ + int group = (int) values.getKey1(id); +$elseif(float||int)$ + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); +$endif$ + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ +$if(BytesRef)$ + BytesRef scratch = new BytesRef(); $endif$ - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.append$Type$($if(BytesRef)$bytes.get(first, scratch)$else$first$endif$); - builder.append$Type$($if(BytesRef)$bytes.get(value, scratch)$else$value$endif$); + try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]$if(BytesRef)$, scratch$endif$); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]$if(BytesRef)$, scratch$endif$); } - default -> builder.append$Type$($if(BytesRef)$bytes.get(value, scratch)$else$value$endif$); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.append$Type$($if(BytesRef)$bytes.get(first, scratch)$else$first$endif$); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } - void enableGroupIdTracking(SeenGroupIds seen) { +$if(BytesRef)$ + private void append($Type$Block.Builder builder, int id, BytesRef scratch) { + BytesRef value = bytes.get(values.getKey2(id), scratch); + builder.appendBytesRef(value); + } + +$else$ + private void append($Type$Block.Builder builder, int id) { +$if(long)$ + long value = values.getKey2(id); +$elseif(double)$ + double value = Double.longBitsToDouble(values.getKey2(id)); +$elseif(float)$ + long both = values.get(id); + float value = Float.intBitsToFloat((int) both); +$elseif(int)$ + long both = values.get(id); + int value = (int) both; +$endif$ + builder.append$Type$(value); + } + +$endif$ + public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block }