diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java index 77c70bc3a10f4..a6ee1cc1d21f5 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java @@ -60,6 +60,9 @@ import java.util.stream.LongStream; import java.util.stream.Stream; +/** + * Benchmark for many different kinds of aggregator and groupings. + */ @Warmup(iterations = 5) @Measurement(iterations = 7) @BenchmarkMode(Mode.AverageTime) diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java new file mode 100644 index 0000000000000..280e6274d84de --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java @@ -0,0 +1,339 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.compute.operator; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.AggregatorMode; +import org.elasticsearch.compute.aggregation.ValuesBytesRefAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.ValuesIntAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier; +import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.AggregationOperator; +import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.HashAggregationOperator; +import org.elasticsearch.compute.operator.Operator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +/** + * Benchmark for the {@code VALUES} aggregator that supports grouping by many many + * many values. + */ +@Warmup(iterations = 5) +@Measurement(iterations = 7) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Thread) +@Fork(1) +public class ValuesAggregatorBenchmark { + static final int MIN_BLOCK_LENGTH = 8 * 1024; + private static final int OP_COUNT = 1024; + private static final int UNIQUE_VALUES = 6; + private static final BytesRef[] KEYWORDS = new BytesRef[] { + new BytesRef("Tokyo"), + new BytesRef("Delhi"), + new BytesRef("Shanghai"), + new BytesRef("São Paulo"), + new BytesRef("Mexico City"), + new BytesRef("Cairo") }; + static { + assert KEYWORDS.length == UNIQUE_VALUES; + } + + private static final BlockFactory blockFactory = BlockFactory.getInstance( + new NoopCircuitBreaker("noop"), + BigArrays.NON_RECYCLING_INSTANCE // TODO real big arrays? + ); + + static { + // Smoke test all the expected values and force loading subclasses more like prod + try { + for (String groups : ValuesAggregatorBenchmark.class.getField("groups").getAnnotationsByType(Param.class)[0].value()) { + for (String dataType : ValuesAggregatorBenchmark.class.getField("dataType").getAnnotationsByType(Param.class)[0].value()) { + run(Integer.parseInt(groups), dataType, 10); + } + } + } catch (NoSuchFieldException e) { + throw new AssertionError(); + } + } + + private static final String BYTES_REF = "BytesRef"; + private static final String INT = "int"; + private static final String LONG = "long"; + + @Param({ "1", "1000", /*"1000000"*/ }) + public int groups; + + @Param({ BYTES_REF, INT, LONG }) + public String dataType; + + private static Operator operator(DriverContext driverContext, int groups, String dataType) { + if (groups == 1) { + return new AggregationOperator( + List.of(supplier(dataType).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)), + driverContext + ); + } + List groupSpec = List.of(new BlockHash.GroupSpec(0, ElementType.LONG)); + return new HashAggregationOperator( + List.of(supplier(dataType).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(1))), + () -> BlockHash.build(groupSpec, driverContext.blockFactory(), 16 * 1024, false), + driverContext + ); + } + + private static AggregatorFunctionSupplier supplier(String dataType) { + return switch (dataType) { + case BYTES_REF -> new ValuesBytesRefAggregatorFunctionSupplier(); + case INT -> new ValuesIntAggregatorFunctionSupplier(); + case LONG -> new ValuesLongAggregatorFunctionSupplier(); + default -> throw new IllegalArgumentException("unsupported data type [" + dataType + "]"); + }; + } + + private static void checkExpected(int groups, String dataType, Page page) { + String prefix = String.format("[%s][%s]", groups, dataType); + int positions = page.getPositionCount(); + if (positions != groups) { + throw new IllegalArgumentException(prefix + " expected " + groups + " positions, got " + positions); + } + if (groups == 1) { + checkUngrouped(prefix, dataType, page); + return; + } + checkGrouped(prefix, groups, dataType, page); + } + + private static void checkGrouped(String prefix, int groups, String dataType, Page page) { + LongVector groupsVector = page.getBlock(0).asVector(); + for (int p = 0; p < groups; p++) { + long group = groupsVector.getLong(p); + if (group != p) { + throw new IllegalArgumentException(prefix + "[" + p + "] expected group " + p + " but was " + groups); + } + } + switch (dataType) { + case BYTES_REF -> { + // Build the expected values + List> expected = new ArrayList<>(groups); + for (int g = 0; g < groups; g++) { + expected.add(new HashSet<>(KEYWORDS.length)); + } + int blockLength = blockLength(groups); + for (int p = 0; p < blockLength; p++) { + expected.get(p % groups).add(KEYWORDS[p % KEYWORDS.length]); + } + + // Check them + BytesRefBlock values = page.getBlock(1); + for (int p = 0; p < groups; p++) { + checkExpectedBytesRef(prefix, values, p, expected.get(p)); + } + } + case INT -> { + // Build the expected values + List> expected = new ArrayList<>(groups); + for (int g = 0; g < groups; g++) { + expected.add(new HashSet<>(UNIQUE_VALUES)); + } + int blockLength = blockLength(groups); + for (int p = 0; p < blockLength; p++) { + expected.get(p % groups).add(p % KEYWORDS.length); + } + + // Check them + IntBlock values = page.getBlock(1); + for (int p = 0; p < groups; p++) { + checkExpectedInt(prefix, values, p, expected.get(p)); + } + } + case LONG -> { + // Build the expected values + List> expected = new ArrayList<>(groups); + for (int g = 0; g < groups; g++) { + expected.add(new HashSet<>(UNIQUE_VALUES)); + } + int blockLength = blockLength(groups); + for (int p = 0; p < blockLength; p++) { + expected.get(p % groups).add((long) p % KEYWORDS.length); + } + + // Check them + LongBlock values = page.getBlock(1); + for (int p = 0; p < groups; p++) { + checkExpectedLong(prefix, values, p, expected.get(p)); + } + } + default -> throw new IllegalArgumentException(prefix + " unsupported data type " + dataType); + } + } + + private static void checkUngrouped(String prefix, String dataType, Page page) { + switch (dataType) { + case BYTES_REF -> { + BytesRefBlock values = page.getBlock(0); + checkExpectedBytesRef(prefix, values, 0, Set.of(KEYWORDS)); + } + case INT -> { + IntBlock values = page.getBlock(0); + checkExpectedInt(prefix, values, 0, IntStream.range(0, UNIQUE_VALUES).boxed().collect(Collectors.toSet())); + } + case LONG -> { + LongBlock values = page.getBlock(0); + checkExpectedLong(prefix, values, 0, LongStream.range(0, UNIQUE_VALUES).boxed().collect(Collectors.toSet())); + } + default -> throw new IllegalArgumentException(prefix + " unsupported data type " + dataType); + } + } + + private static int checkExpectedBlock(String prefix, Block values, int position, Set expected) { + int valueCount = values.getValueCount(position); + if (valueCount != expected.size()) { + throw new IllegalArgumentException( + prefix + "[" + position + "] expected " + expected.size() + " values but count was " + valueCount + ); + } + return valueCount; + } + + private static void checkExpectedBytesRef(String prefix, BytesRefBlock values, int position, Set expected) { + int valueCount = checkExpectedBlock(prefix, values, position, expected); + BytesRef scratch = new BytesRef(); + for (int i = values.getFirstValueIndex(position); i < valueCount; i++) { + BytesRef v = values.getBytesRef(i, scratch); + if (expected.contains(v) == false) { + throw new IllegalArgumentException(prefix + "[" + position + "] expected " + v + " to be in " + expected); + } + } + } + + private static void checkExpectedInt(String prefix, IntBlock values, int position, Set expected) { + int valueCount = checkExpectedBlock(prefix, values, position, expected); + for (int i = values.getFirstValueIndex(position); i < valueCount; i++) { + Integer v = values.getInt(i); + if (expected.contains(v) == false) { + throw new IllegalArgumentException(prefix + "[" + position + "] expected " + v + " to be in " + expected); + } + } + } + + private static void checkExpectedLong(String prefix, LongBlock values, int position, Set expected) { + int valueCount = checkExpectedBlock(prefix, values, position, expected); + for (int i = values.getFirstValueIndex(position); i < valueCount; i++) { + Long v = values.getLong(i); + if (expected.contains(v) == false) { + throw new IllegalArgumentException(prefix + "[" + position + "] expected " + v + " to be in " + expected); + } + } + } + + private static Page page(int groups, String dataType) { + Block dataBlock = dataBlock(groups, dataType); + if (groups == 1) { + return new Page(dataBlock); + } + return new Page(groupingBlock(groups), dataBlock); + } + + private static Block dataBlock(int groups, String dataType) { + int blockLength = blockLength(groups); + return switch (dataType) { + case BYTES_REF -> { + try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(blockLength)) { + for (int i = 0; i < blockLength; i++) { + builder.appendBytesRef(KEYWORDS[i % KEYWORDS.length]); + } + yield builder.build(); + } + } + case INT -> { + try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(blockLength)) { + for (int i = 0; i < blockLength; i++) { + builder.appendInt(i % UNIQUE_VALUES); + } + yield builder.build(); + } + } + case LONG -> { + try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(blockLength)) { + for (int i = 0; i < blockLength; i++) { + builder.appendLong(i % UNIQUE_VALUES); + } + yield builder.build(); + } + } + default -> throw new IllegalArgumentException("unsupported data type " + dataType); + }; + } + + private static Block groupingBlock(int groups) { + int blockLength = blockLength(groups); + try (LongVector.Builder builder = blockFactory.newLongVectorBuilder(blockLength)) { + for (int i = 0; i < blockLength; i++) { + builder.appendLong(i % groups); + } + return builder.build().asBlock(); + } + } + + @Benchmark + public void run() { + run(groups, dataType, OP_COUNT); + } + + private static void run(int groups, String dataType, int opCount) { + DriverContext driverContext = driverContext(); + try (Operator operator = operator(driverContext, groups, dataType)) { + Page page = page(groups, dataType); + for (int i = 0; i < opCount; i++) { + operator.addInput(page.shallowCopy()); + } + operator.finish(); + checkExpected(groups, dataType, operator.getOutput()); + } + } + + static DriverContext driverContext() { + return new DriverContext(BigArrays.NON_RECYCLING_INSTANCE, blockFactory); + } + + static int blockLength(int groups) { + return Math.max(MIN_BLOCK_LENGTH, groups); + } +} diff --git a/docs/changelog/123073.yaml b/docs/changelog/123073.yaml new file mode 100644 index 0000000000000..95c6a4cbfd6b9 --- /dev/null +++ b/docs/changelog/123073.yaml @@ -0,0 +1,5 @@ +pr: 123073 +summary: Speed up VALUES for many buckets +area: ES|QL +type: bug +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java index ad0ab2f7189f6..f326492664fb8 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.aggregation; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefHash; import org.elasticsearch.common.util.LongLongHash; @@ -151,46 +152,127 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - BytesRef scratch = new BytesRef(); - try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - long first = 0; - for (int id = 0; id < values.size(); id++) { - if (values.getKey1(id) == selectedGroup) { - long value = values.getKey2(id); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendBytesRef(bytes.get(first, scratch)); - builder.appendBytesRef(bytes.get(value, scratch)); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + BytesRef scratch = new BytesRef(); + try (BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start], scratch); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i], scratch); } - default -> builder.appendBytesRef(bytes.get(value, scratch)); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendBytesRef(bytes.get(first, scratch)); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } + private void append(BytesRefBlock.Builder builder, int id, BytesRef scratch) { + BytesRef value = bytes.get(values.getKey2(id), scratch); + builder.appendBytesRef(value); + } + @Override public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java index 271d7120092ca..752cd53a140f7 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.LongLongHash; @@ -130,45 +131,126 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - double first = 0; - for (int id = 0; id < values.size(); id++) { - if (values.getKey1(id) == selectedGroup) { - double value = Double.longBitsToDouble(values.getKey2(id)); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendDouble(first); - builder.appendDouble(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (DoubleBlock.Builder builder = blockFactory.newDoubleBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendDouble(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendDouble(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } + private void append(DoubleBlock.Builder builder, int id) { + double value = Double.longBitsToDouble(values.getKey2(id)); + builder.appendDouble(value); + } + @Override public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java index b44cad807fba2..91f1730ab3111 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.ann.Aggregator; @@ -135,47 +136,129 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (FloatBlock.Builder builder = blockFactory.newFloatBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - float first = 0; - for (int id = 0; id < values.size(); id++) { - long both = values.get(id); - int group = (int) (both >>> Float.SIZE); - if (group == selectedGroup) { - float value = Float.intBitsToFloat((int) both); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendFloat(first); - builder.appendFloat(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (FloatBlock.Builder builder = blockFactory.newFloatBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendFloat(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendFloat(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } + private void append(FloatBlock.Builder builder, int id) { + long both = values.get(id); + float value = Float.intBitsToFloat((int) both); + builder.appendFloat(value); + } + @Override public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java index 4d0c518245694..c4f595d938aa9 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.compute.ann.Aggregator; @@ -135,47 +136,129 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - int first = 0; - for (int id = 0; id < values.size(); id++) { - long both = values.get(id); - int group = (int) (both >>> Integer.SIZE); - if (group == selectedGroup) { - int value = (int) both; - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendInt(first); - builder.appendInt(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendInt(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendInt(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } + private void append(IntBlock.Builder builder, int id) { + long both = values.get(id); + int value = (int) both; + builder.appendInt(value); + } + @Override public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java index 5471c90147ec4..8ae5da509151e 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java @@ -7,6 +7,7 @@ package org.elasticsearch.compute.aggregation; +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.LongLongHash; @@ -130,45 +131,126 @@ public void toIntermediate(Block[] blocks, int offset, IntVector selected, Drive blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } - try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) { + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - long first = 0; - for (int id = 0; id < values.size(); id++) { - if (values.getKey1(id) == selectedGroup) { - long value = values.getKey2(id); - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.appendLong(first); - builder.appendLong(value); + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { + int group = (int) values.getKey1(id); + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ + try (LongBlock.Builder builder = blockFactory.newLongBlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]); } - default -> builder.appendLong(value); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.appendLong(first); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } + private void append(LongBlock.Builder builder, int id) { + long value = values.getKey2(id); + builder.appendLong(value); + } + @Override public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st index 3006af595be1f..68c6a8640cbd0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st @@ -10,6 +10,7 @@ package org.elasticsearch.compute.aggregation; $if(BytesRef)$ import org.apache.lucene.util.BytesRef; $endif$ +import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.util.BigArrays; $if(BytesRef)$ import org.elasticsearch.common.util.BytesRefHash; @@ -268,63 +269,157 @@ $endif$ blocks[offset] = toBlock(driverContext.blockFactory(), selected); } + /** + * Builds a {@link Block} with the unique values collected for the {@code #selected} + * groups. This is the implementation of the final and intermediate results of the agg. + */ Block toBlock(BlockFactory blockFactory, IntVector selected) { if (values.size() == 0) { return blockFactory.newConstantNullBlock(selected.getPositionCount()); } -$if(BytesRef)$ - BytesRef scratch = new BytesRef(); + + long selectedCountsSize = 0; + long idsSize = 0; + try { + /* + * Get a count of all groups less than the maximum selected group. Count + * *downwards* so that we can flip the sign on all of the actually selected + * groups. Negative values in this array are always unselected groups. + */ + int selectedCountsLen = selected.max() + 1; + long adjust = RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + selectedCountsLen * Integer.BYTES + ); + blockFactory.adjustBreaker(adjust); + selectedCountsSize = adjust; + int[] selectedCounts = new int[selectedCountsLen]; + for (int id = 0; id < values.size(); id++) { +$if(long||BytesRef||double)$ + int group = (int) values.getKey1(id); +$elseif(float||int)$ + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); $endif$ - try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) { + if (group < selectedCounts.length) { + selectedCounts[group]--; + } + } + + /* + * Total the selected groups and turn the counts into the start index into a sort-of + * off-by-one running count. It's really the number of values that have been inserted + * into the results before starting on this group. Unselected groups will still + * have negative counts. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will contain 0, 3, -2, 4, 5 + */ + int total = 0; for (int s = 0; s < selected.getPositionCount(); s++) { - int selectedGroup = selected.getInt(s); - /* - * Count can effectively be in three states - 0, 1, many. We use those - * states to buffer the first value, so we can avoid calling - * beginPositionEntry on single valued fields. - */ - int count = 0; - $if(BytesRef)$long$else$$type$$endif$ first = 0; - for (int id = 0; id < values.size(); id++) { -$if(long||BytesRef)$ - if (values.getKey1(id) == selectedGroup) { - long value = values.getKey2(id); -$elseif(double)$ - if (values.getKey1(id) == selectedGroup) { - double value = Double.longBitsToDouble(values.getKey2(id)); -$elseif(float)$ - long both = values.get(id); - int group = (int) (both >>> Float.SIZE); - if (group == selectedGroup) { - float value = Float.intBitsToFloat((int) both); -$elseif(int)$ - long both = values.get(id); - int group = (int) (both >>> Integer.SIZE); - if (group == selectedGroup) { - int value = (int) both; + int group = selected.getInt(s); + int count = -selectedCounts[group]; + selectedCounts[group] = total; + total += count; + } + + /* + * Build a list of ids to insert in order *and* convert the running + * count in selectedCounts[group] into the end index (exclusive) in + * ids for each group. + * Here we use the negative counts to signal that a group hasn't been + * selected and the id containing values for that group is ignored. + * + * For example, if + * | Group | Value Count | Selected | + * |-------|-------------|----------| + * | 0 | 3 | <- | + * | 1 | 1 | <- | + * | 2 | 2 | | + * | 3 | 1 | <- | + * | 4 | 4 | <- | + * + * Then the total is 9 and the counts array will start with 0, 3, -2, 4, 5. + * The counts will end with 3, 4, -2, 5, 9. + */ + adjust = RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + total * Integer.BYTES); + blockFactory.adjustBreaker(adjust); + idsSize = adjust; + int[] ids = new int[total]; + for (int id = 0; id < values.size(); id++) { +$if(long||BytesRef||double)$ + int group = (int) values.getKey1(id); +$elseif(float||int)$ + long both = values.get(id); + int group = (int) (both >>> Float.SIZE); +$endif$ + if (group < selectedCounts.length && selectedCounts[group] >= 0) { + ids[selectedCounts[group]++] = id; + } + } + + /* + * Insert the ids in order. + */ +$if(BytesRef)$ + BytesRef scratch = new BytesRef(); $endif$ - switch (count) { - case 0 -> first = value; - case 1 -> { - builder.beginPositionEntry(); - builder.append$Type$($if(BytesRef)$bytes.get(first, scratch)$else$first$endif$); - builder.append$Type$($if(BytesRef)$bytes.get(value, scratch)$else$value$endif$); + try ($Type$Block.Builder builder = blockFactory.new$Type$BlockBuilder(selected.getPositionCount())) { + int start = 0; + for (int s = 0; s < selected.getPositionCount(); s++) { + int group = selected.getInt(s); + int end = selectedCounts[group]; + int count = end - start; + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> append(builder, ids[start]$if(BytesRef)$, scratch$endif$); + default -> { + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + append(builder, ids[i]$if(BytesRef)$, scratch$endif$); } - default -> builder.append$Type$($if(BytesRef)$bytes.get(value, scratch)$else$value$endif$); + builder.endPositionEntry(); } - count++; } + start = end; } - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> builder.append$Type$($if(BytesRef)$bytes.get(first, scratch)$else$first$endif$); - default -> builder.endPositionEntry(); - } + return builder.build(); } - return builder.build(); + } finally { + blockFactory.adjustBreaker(-selectedCountsSize - idsSize); } } +$if(BytesRef)$ + private void append($Type$Block.Builder builder, int id, BytesRef scratch) { + BytesRef value = bytes.get(values.getKey2(id), scratch); + builder.appendBytesRef(value); + } + +$else$ + private void append($Type$Block.Builder builder, int id) { +$if(long)$ + long value = values.getKey2(id); +$elseif(double)$ + double value = Double.longBitsToDouble(values.getKey2(id)); +$elseif(float)$ + long both = values.get(id); + float value = Float.intBitsToFloat((int) both); +$elseif(int)$ + long both = values.get(id); + int value = (int) both; +$endif$ + builder.append$Type$(value); + } + +$endif$ @Override public void enableGroupIdTracking(SeenGroupIds seen) { // we figure out seen values from nulls on the values block