Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support SKEWNESS and KURTOSIS aggregates #10021

Merged
merged 3 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
Expand Down Expand Up @@ -125,7 +126,8 @@ public enum ObjectType {
DoubleLongPair(30),
StringLongPair(31),
CovarianceTuple(32),
VarianceTuple(33);
VarianceTuple(33),
PinotFourthMoment(34);

private final int _value;

Expand Down Expand Up @@ -209,6 +211,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.CovarianceTuple;
} else if (value instanceof VarianceTuple) {
return ObjectType.VarianceTuple;
} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -483,6 +487,24 @@ public VarianceTuple deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<PinotFourthMoment> PINOT_FOURTH_MOMENT_OBJECT_SER_DE
= new ObjectSerDe<PinotFourthMoment>() {
@Override
public byte[] serialize(PinotFourthMoment value) {
return value.serialize();
}

@Override
public PinotFourthMoment deserialize(byte[] bytes) {
return PinotFourthMoment.fromBytes(bytes);
}

@Override
public PinotFourthMoment deserialize(ByteBuffer byteBuffer) {
return PinotFourthMoment.fromBytes(byteBuffer);
}
};

public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() {

@Override
Expand Down Expand Up @@ -1213,7 +1235,8 @@ public Double2LongOpenHashMap deserialize(ByteBuffer byteBuffer) {
DOUBLE_LONG_PAIR_SER_DE,
STRING_LONG_PAIR_SER_DE,
COVARIANCE_TUPLE_OBJECT_SER_DE,
VARIANCE_TUPLE_OBJECT_SER_DE
VARIANCE_TUPLE_OBJECT_SER_DE,
PINOT_FOURTH_MOMENT_OBJECT_SER_DE
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new VarianceAggregationFunction(firstArgument, false, true);
case STDDEVSAMP:
return new VarianceAggregationFunction(firstArgument, true, true);
case SKEWNESS:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.SKEWNESS);
case KURTOSIS:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
default:
throw new IllegalArgumentException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.aggregation.function;

import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.spi.AggregationFunctionType;


public class FourthMomentAggregationFunction extends BaseSingleInputAggregationFunction<PinotFourthMoment, Double> {

private final Type _type;

enum Type {
KURTOSIS, SKEWNESS
}

public FourthMomentAggregationFunction(ExpressionContext expression, Type type) {
super(expression);
_type = type;
}

@Override
public AggregationFunctionType getType() {
switch (_type) {
case KURTOSIS:
return AggregationFunctionType.KURTOSIS;
case SKEWNESS:
return AggregationFunctionType.SKEWNESS;
default:
throw new IllegalArgumentException("Unexpected type " + _type);
}
}

@Override
public AggregationResultHolder createAggregationResultHolder() {
return new ObjectAggregationResultHolder();
}

@Override
public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
}

@Override
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);

PinotFourthMoment m4 = aggregationResultHolder.getResult();
if (m4 == null) {
m4 = new PinotFourthMoment();
aggregationResultHolder.setValue(m4);
}

for (int i = 0; i < length; i++) {
m4.increment(values[i]);
}
}

@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
for (int i = 0; i < length; i++) {
PinotFourthMoment m4 = groupByResultHolder.getResult(groupKeyArray[i]);
if (m4 == null) {
m4 = new PinotFourthMoment();
groupByResultHolder.setValueForKey(groupKeyArray[i], m4);
}
m4.increment(values[i]);
}
}

@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
if (m4 == null) {
m4 = new PinotFourthMoment();
groupByResultHolder.setValueForKey(groupKey, m4);
}
m4.increment(values[i]);
}
}
}

@Override
public PinotFourthMoment extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
PinotFourthMoment m4 = aggregationResultHolder.getResult();
if (m4 == null) {
return new PinotFourthMoment();
} else {
return m4;
}
}

@Override
public PinotFourthMoment extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
if (m4 == null) {
return new PinotFourthMoment();
} else {
return m4;
}
}

@Override
public PinotFourthMoment merge(PinotFourthMoment intermediateResult1, PinotFourthMoment intermediateResult2) {
intermediateResult1.combine(intermediateResult2);
return intermediateResult1;
}

@Override
public DataSchema.ColumnDataType getIntermediateResultColumnType() {
return DataSchema.ColumnDataType.OBJECT;
}

@Override
public DataSchema.ColumnDataType getFinalResultColumnType() {
return DataSchema.ColumnDataType.DOUBLE;
}

@Override
public Double extractFinalResult(PinotFourthMoment m4) {
if (m4 == null) {
return null;
}

switch (_type) {
case KURTOSIS:
return m4.kurtosis();
case SKEWNESS:
return m4.skew();
default:
throw new IllegalStateException("Unexpected value: " + _type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,20 @@ public void testGetAggregationFunction() {
assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLOR);
assertEquals(aggregationFunction.getColumnName(), "boolOr_column");
assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)");

function = getFunction("skewness");
aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
assertEquals(aggregationFunction.getType(), AggregationFunctionType.SKEWNESS);
assertEquals(aggregationFunction.getColumnName(), "skewness_column");
assertEquals(aggregationFunction.getResultColumnName(), "skewness(column)");

function = getFunction("kurtosis");
aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
assertEquals(aggregationFunction.getType(), AggregationFunctionType.KURTOSIS);
assertEquals(aggregationFunction.getColumnName(), "kurtosis_column");
assertEquals(aggregationFunction.getResultColumnName(), "kurtosis(column)");
}

private FunctionContext getFunction(String functionName) {
Expand Down
Loading