Skip to content

Commit

Permalink
[ES|QL] Support some stats on aggregate_metric_double (elastic#120343)
Browse files Browse the repository at this point in the history
Adds non-grouping support for min, max, sum, and count, using
CompositeBlock as the underlying block type and an internal
FromAggregateMetricDouble function to handle converting from
CompositeBlock to the correct metric subfields.

Closes elastic#110649
  • Loading branch information
limotova authored Jan 29, 2025
1 parent c6e722e commit bcd8d15
Show file tree
Hide file tree
Showing 36 changed files with 1,019 additions and 102 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/120343.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 120343
summary: Support some stats on aggregate_metric_double
area: "ES|QL"
type: enhancement
issues:
- 110649
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@ interface BlockFactory {
SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count);

// TODO support non-singleton ords

AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count);
}

/**
Expand Down Expand Up @@ -501,4 +503,16 @@ interface SingletonOrdinalsBuilder extends Builder {
*/
SingletonOrdinalsBuilder appendOrd(int value);
}

interface AggregateMetricDoubleBuilder extends Builder {

DoubleBuilder min();

DoubleBuilder max();

DoubleBuilder sum();

IntBuilder count();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public SingletonOrdsBuilder appendOrd(int value) {
}
return new SingletonOrdsBuilder();
}

@Override
public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
return null;
}
};
}

Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("esql/180_match_operator/match with disjunctions", "Disjunctions in full text functions work now")
// Expected deprecation warning to compat yaml tests:
task.addAllowedWarningRegex(".*rollup functionality will be removed in Elasticsearch.*")
task.skipTest("esql/40_tsdb/from doc with aggregate_metric_double", "TODO: support for subset of metric fields")
task.skipTest("esql/40_tsdb/stats on aggregate_metric_double", "TODO: support for subset of metric fields")
task.skipTest("esql/40_tsdb/from index pattern unsupported counter", "TODO: support for subset of metric fields")
task.skipTest("esql/40_unsupported_types/unsupported", "TODO: support for subset of metric fields")
task.skipTest("esql/40_unsupported_types/unsupported with sort", "TODO: support for subset of metric fields")
})

tasks.named('yamlRestCompatTest').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
public class EsqlCorePlugin extends Plugin implements ExtensiblePlugin {

public static final FeatureFlag SEMANTIC_TEXT_FEATURE_FLAG = new FeatureFlag("esql_semantic_text");
public static final FeatureFlag AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG = new FeatureFlag("esql_aggregate_metric_double");
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ public enum DataType {
* loaded from the index and ESQL will load these fields as strings without their attached
* chunks or embeddings.
*/
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize());
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize()),

AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").estimatedSize(Double.BYTES * 3 + Integer.BYTES));

/**
* Types that are actively being built. These types are not returned
Expand All @@ -316,7 +318,8 @@ public enum DataType {
* check that sending them to a function produces a sane error message.
*/
public static final Map<DataType, FeatureFlag> UNDER_CONSTRUCTION = Map.ofEntries(
Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG)
Map.entry(SEMANTIC_TEXT, EsqlCorePlugin.SEMANTIC_TEXT_FEATURE_FLAG),
Map.entry(AGGREGATE_METRIC_DOUBLE, EsqlCorePlugin.AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG)
);

private final String typeName;
Expand Down Expand Up @@ -553,6 +556,7 @@ public static boolean isRepresentable(DataType t) {
&& t != SOURCE
&& t != HALF_FLOAT
&& t != PARTIAL_AGG
&& t != AGGREGATE_METRIC_DOUBLE
&& t.isCounter() == false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.data;

import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;

public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {

private DoubleBlockBuilder minBuilder;
private DoubleBlockBuilder maxBuilder;
private DoubleBlockBuilder sumBuilder;
private IntBlockBuilder countBuilder;

public AggregateMetricDoubleBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
super(blockFactory);
minBuilder = null;
maxBuilder = null;
sumBuilder = null;
countBuilder = null;
try {
minBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
maxBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
sumBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
countBuilder = new IntBlockBuilder(estimatedSize, blockFactory);
} finally {
if (countBuilder == null) {
Releasables.closeWhileHandlingException(minBuilder, maxBuilder, sumBuilder, countBuilder);
}
}
}

@Override
protected int valuesLength() {
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
}

@Override
protected void growValuesArray(int newSize) {
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
}

@Override
protected int elementSize() {
throw new UnsupportedOperationException("Not available on aggregate_metric_double");
}

@Override
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
Block minBlock;
Block maxBlock;
Block sumBlock;
Block countBlock;
if (block.areAllValuesNull()) {
minBlock = block;
maxBlock = block;
sumBlock = block;
countBlock = block;
} else {
CompositeBlock composite = (CompositeBlock) block;
minBlock = composite.getBlock(Metric.MIN.getIndex());
maxBlock = composite.getBlock(Metric.MAX.getIndex());
sumBlock = composite.getBlock(Metric.SUM.getIndex());
countBlock = composite.getBlock(Metric.COUNT.getIndex());
}
minBuilder.copyFrom(minBlock, beginInclusive, endExclusive);
maxBuilder.copyFrom(maxBlock, beginInclusive, endExclusive);
sumBuilder.copyFrom(sumBlock, beginInclusive, endExclusive);
countBuilder.copyFrom(countBlock, beginInclusive, endExclusive);
return this;
}

@Override
public AbstractBlockBuilder appendNull() {
minBuilder.appendNull();
maxBuilder.appendNull();
sumBuilder.appendNull();
countBuilder.appendNull();
return this;
}

@Override
public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
minBuilder.mvOrdering(mvOrdering);
maxBuilder.mvOrdering(mvOrdering);
sumBuilder.mvOrdering(mvOrdering);
countBuilder.mvOrdering(mvOrdering);
return this;
}

@Override
public Block build() {
Block[] blocks = new Block[4];
boolean success = false;
try {
finish();
blocks[Metric.MIN.getIndex()] = minBuilder.build();
blocks[Metric.MAX.getIndex()] = maxBuilder.build();
blocks[Metric.SUM.getIndex()] = sumBuilder.build();
blocks[Metric.COUNT.getIndex()] = countBuilder.build();
CompositeBlock block = new CompositeBlock(blocks);
success = true;
return block;
} finally {
if (success == false) {
Releasables.closeExpectNoException(blocks);
}
}
}

@Override
protected void extraClose() {
Releasables.closeExpectNoException(minBuilder, maxBuilder, sumBuilder, countBuilder);
}

@Override
public BlockLoader.DoubleBuilder min() {
return minBuilder;
}

@Override
public BlockLoader.DoubleBuilder max() {
return maxBuilder;
}

@Override
public BlockLoader.DoubleBuilder sum() {
return sumBuilder;
}

@Override
public BlockLoader.IntBuilder count() {
return countBuilder;
}

public enum Metric {
MIN(0),
MAX(1),
SUM(2),
COUNT(3);

private final int index;

Metric(int index) {
this.index = index;
}

public int getIndex() {
return index;
}
}

public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
public AggregateMetricDoubleLiteral {
min = min.isNaN() ? null : min;
max = max.isNaN() ? null : max;
sum = sum.isNaN() ? null : sum;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,39 @@ public Block newConstantNullBlock(int positions) {
return b;
}

public AggregateMetricDoubleBlockBuilder newAggregateMetricDoubleBlockBuilder(int estimatedSize) {
return new AggregateMetricDoubleBlockBuilder(estimatedSize, this);
}

public final Block newConstantAggregateMetricDoubleBlock(
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral value,
int positions
) {
try (AggregateMetricDoubleBlockBuilder builder = newAggregateMetricDoubleBlockBuilder(positions)) {
if (value.min() != null) {
builder.min().appendDouble(value.min());
} else {
builder.min().appendNull();
}
if (value.max() != null) {
builder.max().appendDouble(value.max());
} else {
builder.max().appendNull();
}
if (value.sum() != null) {
builder.sum().appendDouble(value.sum());
} else {
builder.sum().appendNull();
}
if (value.count() != null) {
builder.count().appendInt(value.count());
} else {
builder.count().appendNull();
}
return builder.build();
}
}

/**
* Returns the maximum number of bytes that a Block should be backed by a primitive array before switching to using BigArrays.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

Expand Down Expand Up @@ -233,6 +234,14 @@ private static Block constantBlock(BlockFactory blockFactory, ElementType type,
case BYTES_REF -> blockFactory.newConstantBytesRefBlockWith(toBytesRef(val), size);
case DOUBLE -> blockFactory.newConstantDoubleBlockWith((double) val, size);
case BOOLEAN -> blockFactory.newConstantBooleanBlockWith((boolean) val, size);
case COMPOSITE -> {
if (val instanceof AggregateMetricDoubleLiteral aggregateMetricDoubleLiteral) {
yield blockFactory.newConstantAggregateMetricDoubleBlock(aggregateMetricDoubleLiteral, size);
}
throw new UnsupportedOperationException(
"Composite block but received value that wasn't AggregateMetricDoubleLiteral [" + val + "]"
);
}
default -> throw new UnsupportedOperationException("unsupported element type [" + type + "]");
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,22 @@ public int getTotalValueCount() {

@Override
public int getFirstValueIndex(int position) {
throw new UnsupportedOperationException("Composite block");
return blocks[0].getFirstValueIndex(position);
}

@Override
public int getValueCount(int position) {
throw new UnsupportedOperationException("Composite block");
return blocks[0].getValueCount(position);
}

@Override
public boolean isNull(int position) {
throw new UnsupportedOperationException("Composite block");
for (Block block : blocks) {
if (block.isNull(position) == false) {
return false;
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public enum ElementType {
/**
* Composite blocks which contain array of sub-blocks.
*/
COMPOSITE("Composite", (blockFactory, estimatedSize) -> { throw new UnsupportedOperationException("can't build composite blocks"); }),
COMPOSITE("Composite", BlockFactory::newAggregateMetricDoubleBlockBuilder),

/**
* Intermediate blocks which don't support retrieving elements.
Expand Down Expand Up @@ -73,6 +73,8 @@ public static ElementType fromJava(Class<?> type) {
elementType = BYTES_REF;
} else if (type == Boolean.class) {
elementType = BOOLEAN;
} else if (type == AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.class) {
elementType = COMPOSITE;
} else if (type == null || type == Void.class) {
elementType = NULL;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,11 @@ public BytesRefBlock constantBytes(BytesRef value) {
public BlockLoader.SingletonOrdinalsBuilder singletonOrdinalsBuilder(SortedDocValues ordinals, int count) {
return new SingletonOrdinalsBuilder(factory, ordinals, count);
}

@Override
public BlockLoader.AggregateMetricDoubleBuilder aggregateMetricDoubleBuilder(int count) {
return factory.newAggregateMetricDoubleBlockBuilder(count);
}
}

// TODO tests that mix source loaded fields and doc values in the same block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,8 @@ public static Literal randomLiteral(DataType type) {
throw new UncheckedIOException(e);
}
}
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException(
"can't make random values for [" + type.typeName() + "]"
);
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE ->
throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]");
}, type);
}

Expand Down
Loading

0 comments on commit bcd8d15

Please sign in to comment.