Skip to content

Commit

Permalink
[ESQL] Enable "any type" aggregations on Date Nanos (elastic#114438)
Browse files Browse the repository at this point in the history
Resolves elastic#110002
Resolves elastic#110003
Resolves elastic#110005

Enable Values, Count, CountDistinct, Min and Max aggregations on date nanos. In the course of addressing this, I had to make some changes to AggregateMapper where it maps types into string names. I tried to refactor this once before (elastic#110841) but at the time we decided not to go ahead with it. That bit me while working on this, and so I am trying again to refactor it. This time I've made a more localized change, just replacing the cascading if block with a switch. That will cause a compile time failure when future new data types are added, unless they correctly update this section.

I've also done a small refactoring on the aggregators themselves, to make the supplier function consistent with the typeResolution.


---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
2 people authored and jfreden committed Nov 4, 2024
1 parent 838e2b0 commit ce43e94
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ millis:date,nanos:date_nanos,num:long
2023-10-23T13:33:34.937Z,2023-10-23T13:33:34.937193000Z,1698068014937193000
2023-10-23T12:27:28.948Z,2023-10-23T12:27:28.948000000Z,1698064048948000000
2023-10-23T12:15:03.360Z,2023-10-23T12:15:03.360103847Z,1698063303360103847
2023-10-23T12:15:03.360Z,2023-10-23T12:15:03.360103847Z,1698063303360103847
1999-10-23T12:15:03.360Z,[2023-03-23T12:15:03.360103847Z, 2023-02-23T13:33:34.937193000Z, 2023-01-23T13:55:01.543123456Z], 0
1999-10-22T12:15:03.360Z,[2023-03-23T12:15:03.360103847Z, 2023-03-23T12:15:03.360103847Z, 2023-03-23T12:15:03.360103847Z], 0
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ l:long
1698068014937193000
1698064048948000000
1698063303360103847
1698063303360103847
;

long to date nanos, index version
Expand All @@ -231,6 +232,7 @@ d:date_nanos
2023-10-23T13:33:34.937193000Z
2023-10-23T12:27:28.948000000Z
2023-10-23T12:15:03.360103847Z
2023-10-23T12:15:03.360103847Z
;

date_nanos to date nanos, index version
Expand All @@ -246,6 +248,7 @@ d:date_nanos
2023-10-23T13:33:34.937193000Z
2023-10-23T12:27:28.948000000Z
2023-10-23T12:15:03.360103847Z
2023-10-23T12:15:03.360103847Z
;

attempt to cast the result of a fold to date nanos
Expand Down Expand Up @@ -331,3 +334,31 @@ a:date_nanos
[2023-02-23T13:33:34.937193000Z, 2023-03-23T12:15:03.360103847Z]
[2023-03-23T12:15:03.360103847Z, 2023-03-23T12:15:03.360103847Z]
;


Max and Min of date nanos
required_capability: date_nanos_aggregations

FROM date_nanos | STATS max = MAX(nanos), min = MIN(nanos);

max:date_nanos | min:date_nanos
2023-10-23T13:55:01.543123456Z | 2023-01-23T13:55:01.543123456Z
;

Count and count distinct of date nanos
required_capability: date_nanos_aggregations

FROM date_nanos | WHERE millis > "2020-01-01" | STATS count = COUNT(nanos), count_distinct = COUNT_DISTINCT(nanos);

count:long | count_distinct:long
8 | 7
;

Values aggregation on date nanos
required_capability: date_nanos_aggregations

FROM date_nanos | WHERE millis > "2020-01-01" | STATS v = MV_SORT(VALUES(nanos), "DESC");

v:date_nanos
[2023-10-23T13:55:01.543123456Z, 2023-10-23T13:53:55.832987654Z, 2023-10-23T13:52:55.015787878Z, 2023-10-23T13:51:54.732102837Z, 2023-10-23T13:33:34.937193000Z, 2023-10-23T12:27:28.948000000Z, 2023-10-23T12:15:03.360103847Z]
;
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,11 @@ public enum Cap {
*/
LEAST_GREATEST_FOR_DATENANOS(EsqlCorePlugin.DATE_NANOS_FEATURE_FLAG),

/**
* support aggregations on date nanos
*/
DATE_NANOS_AGGREGATIONS(EsqlCorePlugin.DATE_NANOS_FEATURE_FLAG),

/**
* Support for datetime in least and greatest functions
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
Expand All @@ -53,6 +55,20 @@ public class CountDistinct extends AggregateFunction implements OptionalArgument
CountDistinct::new
);

private static final Map<DataType, BiFunction<List<Integer>, Integer, AggregatorFunctionSupplier>> SUPPLIERS = Map.ofEntries(
// Booleans ignore the precision because there are only two possible values anyway
Map.entry(DataType.BOOLEAN, (inputChannels, precision) -> new CountDistinctBooleanAggregatorFunctionSupplier(inputChannels)),
Map.entry(DataType.LONG, CountDistinctLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATETIME, CountDistinctLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATE_NANOS, CountDistinctLongAggregatorFunctionSupplier::new),
Map.entry(DataType.INTEGER, CountDistinctIntAggregatorFunctionSupplier::new),
Map.entry(DataType.DOUBLE, CountDistinctDoubleAggregatorFunctionSupplier::new),
Map.entry(DataType.KEYWORD, CountDistinctBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.IP, CountDistinctBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.VERSION, CountDistinctBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.TEXT, CountDistinctBytesRefAggregatorFunctionSupplier::new)
);

private static final int DEFAULT_PRECISION = 3000;
private final Expression precision;

Expand Down Expand Up @@ -102,7 +118,7 @@ public CountDistinct(
Source source,
@Param(
name = "field",
type = { "boolean", "date", "double", "integer", "ip", "keyword", "long", "text", "version" },
type = { "boolean", "date", "date_nanos", "double", "integer", "ip", "keyword", "long", "text", "version" },
description = "Column or literal for which to count the number of distinct values."
) Expression field,
@Param(
Expand Down Expand Up @@ -179,7 +195,7 @@ protected TypeResolution resolveType() {
.and(
isType(
field(),
dt -> dt != DataType.UNSIGNED_LONG && dt != DataType.SOURCE,
SUPPLIERS::containsKey,
sourceText(),
DEFAULT,
"any exact type except unsigned_long, _source, or counter types"
Expand All @@ -196,23 +212,11 @@ protected TypeResolution resolveType() {
public AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {
DataType type = field().dataType();
int precision = this.precision == null ? DEFAULT_PRECISION : ((Number) this.precision.fold()).intValue();
if (type == DataType.BOOLEAN) {
// Booleans ignore the precision because there are only two possible values anyway
return new CountDistinctBooleanAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.DATETIME || type == DataType.LONG) {
return new CountDistinctLongAggregatorFunctionSupplier(inputChannels, precision);
}
if (type == DataType.INTEGER) {
return new CountDistinctIntAggregatorFunctionSupplier(inputChannels, precision);
}
if (type == DataType.DOUBLE) {
return new CountDistinctDoubleAggregatorFunctionSupplier(inputChannels, precision);
}
if (DataType.isString(type) || type == DataType.IP || type == DataType.VERSION) {
return new CountDistinctBytesRefAggregatorFunctionSupplier(inputChannels, precision);
if (SUPPLIERS.containsKey(type) == false) {
// If the type checking did its job, this should never happen
throw EsqlIllegalArgumentException.illegalDataType(type);
}
throw EsqlIllegalArgumentException.illegalDataType(type);
return SUPPLIERS.get(type).apply(inputChannels, precision);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,28 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
import static org.elasticsearch.xpack.esql.core.type.DataType.isRepresentable;
import static org.elasticsearch.xpack.esql.core.type.DataType.isSpatial;

public class Max extends AggregateFunction implements ToAggregator, SurrogateExpression {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Max", Max::new);

private static final Map<DataType, Function<List<Integer>, AggregatorFunctionSupplier>> SUPPLIERS = Map.ofEntries(
Map.entry(DataType.BOOLEAN, MaxBooleanAggregatorFunctionSupplier::new),
Map.entry(DataType.LONG, MaxLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATETIME, MaxLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATE_NANOS, MaxLongAggregatorFunctionSupplier::new),
Map.entry(DataType.INTEGER, MaxIntAggregatorFunctionSupplier::new),
Map.entry(DataType.DOUBLE, MaxDoubleAggregatorFunctionSupplier::new),
Map.entry(DataType.IP, MaxIpAggregatorFunctionSupplier::new),
Map.entry(DataType.KEYWORD, MaxBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.TEXT, MaxBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.VERSION, MaxBytesRefAggregatorFunctionSupplier::new)
);

@FunctionInfo(
returnType = { "boolean", "double", "integer", "long", "date", "ip", "keyword", "text", "long", "version" },
description = "The maximum value of a field.",
Expand Down Expand Up @@ -98,7 +110,7 @@ public Max replaceChildren(List<Expression> newChildren) {
protected TypeResolution resolveType() {
return TypeResolutions.isType(
field(),
t -> isRepresentable(t) && t != UNSIGNED_LONG && isSpatial(t) == false,
SUPPLIERS::containsKey,
sourceText(),
DEFAULT,
"representable except unsigned_long and spatial types"
Expand All @@ -113,25 +125,11 @@ public DataType dataType() {
@Override
public final AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {
DataType type = field().dataType();
if (type == DataType.BOOLEAN) {
return new MaxBooleanAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.LONG || type == DataType.DATETIME) {
return new MaxLongAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.INTEGER) {
return new MaxIntAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.DOUBLE) {
return new MaxDoubleAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.IP) {
return new MaxIpAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.VERSION || DataType.isString(type)) {
return new MaxBytesRefAggregatorFunctionSupplier(inputChannels);
if (SUPPLIERS.containsKey(type) == false) {
// If the type checking did its job, this should never happen
throw EsqlIllegalArgumentException.illegalDataType(type);
}
throw EsqlIllegalArgumentException.illegalDataType(type);
return SUPPLIERS.get(type).apply(inputChannels);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,28 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
import static org.elasticsearch.xpack.esql.core.type.DataType.isRepresentable;
import static org.elasticsearch.xpack.esql.core.type.DataType.isSpatial;

public class Min extends AggregateFunction implements ToAggregator, SurrogateExpression {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Min", Min::new);

private static final Map<DataType, Function<List<Integer>, AggregatorFunctionSupplier>> SUPPLIERS = Map.ofEntries(
Map.entry(DataType.BOOLEAN, MinBooleanAggregatorFunctionSupplier::new),
Map.entry(DataType.LONG, MinLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATETIME, MinLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATE_NANOS, MinLongAggregatorFunctionSupplier::new),
Map.entry(DataType.INTEGER, MinIntAggregatorFunctionSupplier::new),
Map.entry(DataType.DOUBLE, MinDoubleAggregatorFunctionSupplier::new),
Map.entry(DataType.IP, MinIpAggregatorFunctionSupplier::new),
Map.entry(DataType.VERSION, MinBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.KEYWORD, MinBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.TEXT, MinBytesRefAggregatorFunctionSupplier::new)
);

@FunctionInfo(
returnType = { "boolean", "double", "integer", "long", "date", "ip", "keyword", "text", "long", "version" },
description = "The minimum value of a field.",
Expand Down Expand Up @@ -98,7 +110,7 @@ public Min withFilter(Expression filter) {
protected TypeResolution resolveType() {
return TypeResolutions.isType(
field(),
t -> isRepresentable(t) && t != UNSIGNED_LONG && isSpatial(t) == false,
SUPPLIERS::containsKey,
sourceText(),
DEFAULT,
"representable except unsigned_long and spatial types"
Expand All @@ -113,25 +125,11 @@ public DataType dataType() {
@Override
public final AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {
DataType type = field().dataType();
if (type == DataType.BOOLEAN) {
return new MinBooleanAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.LONG || type == DataType.DATETIME) {
return new MinLongAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.INTEGER) {
return new MinIntAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.DOUBLE) {
return new MinDoubleAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.IP) {
return new MinIpAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.VERSION || DataType.isString(type)) {
return new MinBytesRefAggregatorFunctionSupplier(inputChannels);
if (SUPPLIERS.containsKey(type) == false) {
// If the type checking did its job, this should never happen
throw EsqlIllegalArgumentException.illegalDataType(type);
}
throw EsqlIllegalArgumentException.illegalDataType(type);
return SUPPLIERS.get(type).apply(inputChannels);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,28 @@

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static java.util.Collections.emptyList;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;

public class Values extends AggregateFunction implements ToAggregator {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Values", Values::new);

private static final Map<DataType, Function<List<Integer>, AggregatorFunctionSupplier>> SUPPLIERS = Map.ofEntries(
Map.entry(DataType.INTEGER, ValuesIntAggregatorFunctionSupplier::new),
Map.entry(DataType.LONG, ValuesLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATETIME, ValuesLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DATE_NANOS, ValuesLongAggregatorFunctionSupplier::new),
Map.entry(DataType.DOUBLE, ValuesDoubleAggregatorFunctionSupplier::new),
Map.entry(DataType.KEYWORD, ValuesBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.TEXT, ValuesBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.IP, ValuesBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.VERSION, ValuesBytesRefAggregatorFunctionSupplier::new),
Map.entry(DataType.BOOLEAN, ValuesBooleanAggregatorFunctionSupplier::new)
);

@FunctionInfo(
returnType = { "boolean", "date", "double", "integer", "ip", "keyword", "long", "text", "version" },
preview = true,
Expand Down Expand Up @@ -98,7 +112,7 @@ public DataType dataType() {
protected TypeResolution resolveType() {
return TypeResolutions.isType(
field(),
dt -> DataType.isSpatial(dt) == false && dt != UNSIGNED_LONG,
SUPPLIERS::containsKey,
sourceText(),
DEFAULT,
"any type except unsigned_long and spatial types"
Expand All @@ -108,22 +122,10 @@ protected TypeResolution resolveType() {
@Override
public AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {
DataType type = field().dataType();
if (type == DataType.INTEGER) {
return new ValuesIntAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.LONG || type == DataType.DATETIME) {
return new ValuesLongAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.DOUBLE) {
return new ValuesDoubleAggregatorFunctionSupplier(inputChannels);
}
if (DataType.isString(type) || type == DataType.IP || type == DataType.VERSION) {
return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels);
}
if (type == DataType.BOOLEAN) {
return new ValuesBooleanAggregatorFunctionSupplier(inputChannels);
if (SUPPLIERS.containsKey(type) == false) {
// If the type checking did its job, this should never happen
throw EsqlIllegalArgumentException.illegalDataType(type);
}
// TODO cartesian_point, geo_point
throw EsqlIllegalArgumentException.illegalDataType(type);
return SUPPLIERS.get(type).apply(inputChannels);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,25 +297,18 @@ private static String dataTypeToString(DataType type, Class<?> aggClass) {
if (aggClass == Top.class && type.equals(DataType.IP)) {
return "Ip";
}
if (type.equals(DataType.BOOLEAN)) {
return "Boolean";
} else if (type.equals(DataType.INTEGER) || type.equals(DataType.COUNTER_INTEGER)) {
return "Int";
} else if (type.equals(DataType.LONG) || type.equals(DataType.DATETIME) || type.equals(DataType.COUNTER_LONG)) {
return "Long";
} else if (type.equals(DataType.DOUBLE) || type.equals(DataType.COUNTER_DOUBLE)) {
return "Double";
} else if (type.equals(DataType.KEYWORD)
|| type.equals(DataType.IP)
|| type.equals(DataType.VERSION)
|| type.equals(DataType.TEXT)) {
return "BytesRef";
} else if (type.equals(GEO_POINT)) {
return "GeoPoint";
} else if (type.equals(CARTESIAN_POINT)) {
return "CartesianPoint";
} else {

return switch (type) {
case DataType.BOOLEAN -> "Boolean";
case DataType.INTEGER, DataType.COUNTER_INTEGER -> "Int";
case DataType.LONG, DataType.DATETIME, DataType.COUNTER_LONG, DataType.DATE_NANOS -> "Long";
case DataType.DOUBLE, DataType.COUNTER_DOUBLE -> "Double";
case DataType.KEYWORD, DataType.IP, DataType.VERSION, DataType.TEXT -> "BytesRef";
case GEO_POINT -> "GeoPoint";
case CARTESIAN_POINT -> "CartesianPoint";
case SEMANTIC_TEXT, UNSUPPORTED, NULL, UNSIGNED_LONG, SHORT, BYTE, FLOAT, HALF_FLOAT, SCALED_FLOAT, OBJECT, SOURCE, DATE_PERIOD,
TIME_DURATION, CARTESIAN_SHAPE, GEO_SHAPE, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG ->
throw new EsqlIllegalArgumentException("illegal agg type: " + type.typeName());
}
};
}
}

0 comments on commit ce43e94

Please sign in to comment.