Skip to content

Commit

Permalink
support SKEW_POP and KURTOSIS_POP aggregates
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Dec 21, 2022
1 parent c034503 commit 49607d7
Show file tree
Hide file tree
Showing 8 changed files with 573 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.pinot.segment.local.customobject.IntLongPair;
import org.apache.pinot.segment.local.customobject.LongLongPair;
import org.apache.pinot.segment.local.customobject.MinMaxRangePair;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.local.customobject.QuantileDigest;
import org.apache.pinot.segment.local.customobject.StringLongPair;
import org.apache.pinot.segment.local.customobject.VarianceTuple;
Expand Down Expand Up @@ -125,7 +126,8 @@ public enum ObjectType {
DoubleLongPair(30),
StringLongPair(31),
CovarianceTuple(32),
VarianceTuple(33);
VarianceTuple(33),
PinotFourthMoment(34);

private final int _value;

Expand Down Expand Up @@ -209,6 +211,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.CovarianceTuple;
} else if (value instanceof VarianceTuple) {
return ObjectType.VarianceTuple;
} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
} else {
throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
}
Expand Down Expand Up @@ -483,6 +487,23 @@ public VarianceTuple deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<PinotFourthMoment> PINOT_FOURTH_MOMENT_OBJECT_SER_DE = new ObjectSerDe<>() {
@Override
public byte[] serialize(PinotFourthMoment value) {
return value.serialize();
}

@Override
public PinotFourthMoment deserialize(byte[] bytes) {
return PinotFourthMoment.fromBytes(bytes);
}

@Override
public PinotFourthMoment deserialize(ByteBuffer byteBuffer) {
return PinotFourthMoment.fromBytes(byteBuffer);
}
};

public static final ObjectSerDe<HyperLogLog> HYPER_LOG_LOG_SER_DE = new ObjectSerDe<HyperLogLog>() {

@Override
Expand Down Expand Up @@ -1213,7 +1234,8 @@ public Double2LongOpenHashMap deserialize(ByteBuffer byteBuffer) {
DOUBLE_LONG_PAIR_SER_DE,
STRING_LONG_PAIR_SER_DE,
COVARIANCE_TUPLE_OBJECT_SER_DE,
VARIANCE_TUPLE_OBJECT_SER_DE
VARIANCE_TUPLE_OBJECT_SER_DE,
PINOT_FOURTH_MOMENT_OBJECT_SER_DE
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new VarianceAggregationFunction(firstArgument, false, true);
case STDDEVSAMP:
return new VarianceAggregationFunction(firstArgument, true, true);
case SKEWPOP:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.SKEW);
case KURTOSISPOP:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
default:
throw new IllegalArgumentException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.query.aggregation.function;

import java.util.Map;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils;
import org.apache.pinot.segment.local.customobject.PinotFourthMoment;
import org.apache.pinot.segment.spi.AggregationFunctionType;


public class FourthMomentAggregationFunction extends BaseSingleInputAggregationFunction<PinotFourthMoment, Double> {

private final Type _type;

enum Type {
KURTOSIS,
SKEW
}

public FourthMomentAggregationFunction(ExpressionContext expression, Type type) {
super(expression);
_type = type;
}

@Override
public AggregationFunctionType getType() {
switch (_type) {
case KURTOSIS:
return AggregationFunctionType.KURTOSISPOP;
case SKEW:
return AggregationFunctionType.SKEWPOP;
default:
throw new IllegalArgumentException("Unexpected type " + _type);
}
}

@Override
public AggregationResultHolder createAggregationResultHolder() {
return new ObjectAggregationResultHolder();
}

@Override
public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
}

@Override
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);

PinotFourthMoment m4 = aggregationResultHolder.getResult();
if (m4 == null) {
m4 = new PinotFourthMoment();
aggregationResultHolder.setValue(m4);
}

for (int i = 0; i < length; i++) {
m4.increment(values[i]);
}
}

@Override
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
for (int i = 0; i < length; i++) {
PinotFourthMoment m4 = groupByResultHolder.getResult(groupKeyArray[i]);
if (m4 == null) {
m4 = new PinotFourthMoment();
groupByResultHolder.setValueForKey(groupKeyArray[i], m4);
}
m4.increment(values[i]);
}
}

@Override
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression);
for (int i = 0; i < length; i++) {
for (int groupKey : groupKeysArray[i]) {
PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
if (m4 == null) {
m4 = new PinotFourthMoment();
groupByResultHolder.setValueForKey(groupKey, m4);
}
m4.increment(values[i]);
}
}
}

@Override
public PinotFourthMoment extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
PinotFourthMoment m4 = aggregationResultHolder.getResult();
if (m4 == null) {
return new PinotFourthMoment();
} else {
return m4;
}
}

@Override
public PinotFourthMoment extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey);
if (m4 == null) {
return new PinotFourthMoment();
} else {
return m4;
}
}

@Override
public PinotFourthMoment merge(PinotFourthMoment intermediateResult1, PinotFourthMoment intermediateResult2) {
intermediateResult1.combine(intermediateResult2);
return intermediateResult1;
}

@Override
public DataSchema.ColumnDataType getIntermediateResultColumnType() {
return DataSchema.ColumnDataType.OBJECT;
}

@Override
public DataSchema.ColumnDataType getFinalResultColumnType() {
return DataSchema.ColumnDataType.DOUBLE;
}

@Override
public Double extractFinalResult(PinotFourthMoment m4) {
if (m4 == null) {
return null;
}

switch (_type) {
case KURTOSIS:
return m4.kurtosis();
case SKEW:
return m4.skew();
default:
throw new IllegalStateException("Unexpected value: " + _type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,20 @@ public void testGetAggregationFunction() {
assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLOR);
assertEquals(aggregationFunction.getColumnName(), "boolOr_column");
assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)");

function = getFunction("skew_pop");
aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
assertEquals(aggregationFunction.getType(), AggregationFunctionType.SKEWPOP);
assertEquals(aggregationFunction.getColumnName(), "skewPop_column");
assertEquals(aggregationFunction.getResultColumnName(), "skewpop(column)");

function = getFunction("kurtosis_pop");
aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT);
assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction);
assertEquals(aggregationFunction.getType(), AggregationFunctionType.KURTOSISPOP);
assertEquals(aggregationFunction.getColumnName(), "kurtosisPop_column");
assertEquals(aggregationFunction.getResultColumnName(), "kurtosispop(column)");
}

private FunctionContext getFunction(String functionName) {
Expand Down
Loading

0 comments on commit 49607d7

Please sign in to comment.