diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java index 6fbacc5fcd8d..fbfee474e9af 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java @@ -70,6 +70,7 @@ import org.apache.pinot.segment.local.customobject.IntLongPair; import org.apache.pinot.segment.local.customobject.LongLongPair; import org.apache.pinot.segment.local.customobject.MinMaxRangePair; +import org.apache.pinot.segment.local.customobject.PinotFourthMoment; import org.apache.pinot.segment.local.customobject.QuantileDigest; import org.apache.pinot.segment.local.customobject.StringLongPair; import org.apache.pinot.segment.local.customobject.VarianceTuple; @@ -125,7 +126,8 @@ public enum ObjectType { DoubleLongPair(30), StringLongPair(31), CovarianceTuple(32), - VarianceTuple(33); + VarianceTuple(33), + PinotFourthMoment(34); private final int _value; @@ -209,6 +211,8 @@ public static ObjectType getObjectType(Object value) { return ObjectType.CovarianceTuple; } else if (value instanceof VarianceTuple) { return ObjectType.VarianceTuple; + } else if (value instanceof PinotFourthMoment) { + return ObjectType.PinotFourthMoment; } else { throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName()); } @@ -483,6 +487,24 @@ public VarianceTuple deserialize(ByteBuffer byteBuffer) { } }; + public static final ObjectSerDe PINOT_FOURTH_MOMENT_OBJECT_SER_DE + = new ObjectSerDe() { + @Override + public byte[] serialize(PinotFourthMoment value) { + return value.serialize(); + } + + @Override + public PinotFourthMoment deserialize(byte[] bytes) { + return PinotFourthMoment.fromBytes(bytes); + } + + @Override + public PinotFourthMoment deserialize(ByteBuffer byteBuffer) { + return PinotFourthMoment.fromBytes(byteBuffer); + } + }; + public static final ObjectSerDe HYPER_LOG_LOG_SER_DE = new ObjectSerDe() { @Override @@ -1213,7 +1235,8 @@ public Double2LongOpenHashMap deserialize(ByteBuffer byteBuffer) { DOUBLE_LONG_PAIR_SER_DE, STRING_LONG_PAIR_SER_DE, COVARIANCE_TUPLE_OBJECT_SER_DE, - VARIANCE_TUPLE_OBJECT_SER_DE + VARIANCE_TUPLE_OBJECT_SER_DE, + PINOT_FOURTH_MOMENT_OBJECT_SER_DE }; //@formatter:on diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java index 2758d193a238..4ba341322153 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java @@ -285,6 +285,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio return new VarianceAggregationFunction(firstArgument, false, true); case STDDEVSAMP: return new VarianceAggregationFunction(firstArgument, true, true); + case SKEWNESS: + return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.SKEWNESS); + case KURTOSIS: + return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS); default: throw new IllegalArgumentException(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java new file mode 100644 index 000000000000..9cb06e4eebed --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/FourthMomentAggregationFunction.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.function; + +import java.util.Map; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.query.aggregation.AggregationResultHolder; +import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; +import org.apache.pinot.core.query.aggregation.utils.StatisticalAggregationFunctionUtils; +import org.apache.pinot.segment.local.customobject.PinotFourthMoment; +import org.apache.pinot.segment.spi.AggregationFunctionType; + + +public class FourthMomentAggregationFunction extends BaseSingleInputAggregationFunction { + + private final Type _type; + + enum Type { + KURTOSIS, SKEWNESS + } + + public FourthMomentAggregationFunction(ExpressionContext expression, Type type) { + super(expression); + _type = type; + } + + @Override + public AggregationFunctionType getType() { + switch (_type) { + case KURTOSIS: + return AggregationFunctionType.KURTOSIS; + case SKEWNESS: + return AggregationFunctionType.SKEWNESS; + default: + throw new IllegalArgumentException("Unexpected type " + _type); + } + } + + @Override + public AggregationResultHolder createAggregationResultHolder() { + return new ObjectAggregationResultHolder(); + } + + @Override + public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) { + return new ObjectGroupByResultHolder(initialCapacity, maxCapacity); + } + + @Override + public void aggregate(int length, AggregationResultHolder aggregationResultHolder, + Map blockValSetMap) { + double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression); + + PinotFourthMoment m4 = aggregationResultHolder.getResult(); + if (m4 == null) { + m4 = new PinotFourthMoment(); + aggregationResultHolder.setValue(m4); + } + + for (int i = 0; i < length; i++) { + m4.increment(values[i]); + } + } + + @Override + public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression); + for (int i = 0; i < length; i++) { + PinotFourthMoment m4 = groupByResultHolder.getResult(groupKeyArray[i]); + if (m4 == null) { + m4 = new PinotFourthMoment(); + groupByResultHolder.setValueForKey(groupKeyArray[i], m4); + } + m4.increment(values[i]); + } + } + + @Override + public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map blockValSetMap) { + double[] values = StatisticalAggregationFunctionUtils.getValSet(blockValSetMap, _expression); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey); + if (m4 == null) { + m4 = new PinotFourthMoment(); + groupByResultHolder.setValueForKey(groupKey, m4); + } + m4.increment(values[i]); + } + } + } + + @Override + public PinotFourthMoment extractAggregationResult(AggregationResultHolder aggregationResultHolder) { + PinotFourthMoment m4 = aggregationResultHolder.getResult(); + if (m4 == null) { + return new PinotFourthMoment(); + } else { + return m4; + } + } + + @Override + public PinotFourthMoment extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { + PinotFourthMoment m4 = groupByResultHolder.getResult(groupKey); + if (m4 == null) { + return new PinotFourthMoment(); + } else { + return m4; + } + } + + @Override + public PinotFourthMoment merge(PinotFourthMoment intermediateResult1, PinotFourthMoment intermediateResult2) { + intermediateResult1.combine(intermediateResult2); + return intermediateResult1; + } + + @Override + public DataSchema.ColumnDataType getIntermediateResultColumnType() { + return DataSchema.ColumnDataType.OBJECT; + } + + @Override + public DataSchema.ColumnDataType getFinalResultColumnType() { + return DataSchema.ColumnDataType.DOUBLE; + } + + @Override + public Double extractFinalResult(PinotFourthMoment m4) { + if (m4 == null) { + return null; + } + + switch (_type) { + case KURTOSIS: + return m4.kurtosis(); + case SKEWNESS: + return m4.skew(); + default: + throw new IllegalStateException("Unexpected value: " + _type); + } + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java index 144949a0d5d3..a29694ef79e8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactoryTest.java @@ -458,6 +458,20 @@ public void testGetAggregationFunction() { assertEquals(aggregationFunction.getType(), AggregationFunctionType.BOOLOR); assertEquals(aggregationFunction.getColumnName(), "boolOr_column"); assertEquals(aggregationFunction.getResultColumnName(), "boolor(column)"); + + function = getFunction("skewness"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.SKEWNESS); + assertEquals(aggregationFunction.getColumnName(), "skewness_column"); + assertEquals(aggregationFunction.getResultColumnName(), "skewness(column)"); + + function = getFunction("kurtosis"); + aggregationFunction = AggregationFunctionFactory.getAggregationFunction(function, DUMMY_QUERY_CONTEXT); + assertTrue(aggregationFunction instanceof FourthMomentAggregationFunction); + assertEquals(aggregationFunction.getType(), AggregationFunctionType.KURTOSIS); + assertEquals(aggregationFunction.getColumnName(), "kurtosis_column"); + assertEquals(aggregationFunction.getResultColumnName(), "kurtosis(column)"); } private FunctionContext getFunction(String functionName) { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java index 05defe18f041..9365006b3572 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/StatisticalQueriesTest.java @@ -28,6 +28,8 @@ import java.util.Random; import org.apache.commons.io.FileUtils; import org.apache.commons.math3.stat.correlation.Covariance; +import org.apache.commons.math3.stat.descriptive.moment.Kurtosis; +import org.apache.commons.math3.stat.descriptive.moment.Skewness; import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; import org.apache.commons.math3.stat.descriptive.moment.Variance; import org.apache.commons.math3.util.Precision; @@ -39,6 +41,7 @@ import org.apache.pinot.core.operator.query.GroupByOperator; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.segment.local.customobject.CovarianceTuple; +import org.apache.pinot.segment.local.customobject.PinotFourthMoment; import org.apache.pinot.segment.local.customobject.VarianceTuple; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -705,6 +708,172 @@ public void testStandardDeviationAggreagtionGroupBy() { } } + @Test + public void testSkewAggregationOnly() { + // Compute the expected values + Skewness[] expectedSkew = new Skewness[4]; + for (int i = 0; i < 4; i++) { + expectedSkew[i] = new Skewness(); + } + + for (int i = 0; i < NUM_RECORDS; i++) { + expectedSkew[0].increment(_intColX[i]); + expectedSkew[1].increment(_longCol[i]); + expectedSkew[2].increment(_floatCol[i]); + expectedSkew[3].increment(_doubleColX[i]); + } + + // Compute the query + String query = + "SELECT SKEWNESS(intColumnX), SKEWNESS(longColumn), SKEWNESS(floatColumn), SKEWNESS(doubleColumnX) " + + "FROM testTable"; + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 4, NUM_RECORDS); + List aggregationResult = resultsBlock.getResults(); + + // Validate the aggregation results + checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(0), NUM_RECORDS, expectedSkew[0].getResult()); + checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(1), NUM_RECORDS, expectedSkew[1].getResult()); + checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(2), NUM_RECORDS, expectedSkew[2].getResult()); + checkWithPrecisionForSkew((PinotFourthMoment) aggregationResult.get(3), NUM_RECORDS, expectedSkew[3].getResult()); + + // Validate the response + BrokerResponseNative brokerResponse = getBrokerResponse(query); + brokerResponse.getResultTable(); + Object[] results = brokerResponse.getResultTable().getRows().get(0); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[0], expectedSkew[0].getResult(), RELATIVE_EPSILON)); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[1], expectedSkew[1].getResult(), RELATIVE_EPSILON)); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[2], expectedSkew[2].getResult(), RELATIVE_EPSILON)); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[3], expectedSkew[3].getResult(), RELATIVE_EPSILON)); + + // Validate the response for a query with a filter + query = "SELECT SKEWNESS(intColumnX) from testTable" + getFilter(); + brokerResponse = getBrokerResponse(query); + brokerResponse.getResultTable(); + results = brokerResponse.getResultTable().getRows().get(0); + Skewness filterExpectedSkew = new Skewness(); + for (int i = 0; i < NUM_RECORDS / 2; i++) { + filterExpectedSkew.increment(_intColX[i]); + } + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedSkew.getResult(), RELATIVE_EPSILON)); + } + + @Test + public void testSkewAggregationGroupBy() { + // Compute expected group results + Skewness[] expectedGroupByResult = new Skewness[NUM_GROUPS]; + + for (int i = 0; i < NUM_GROUPS; i++) { + expectedGroupByResult[i] = new Skewness(); + } + for (int j = 0; j < NUM_RECORDS; j++) { + int pos = j / (NUM_RECORDS / NUM_GROUPS); + expectedGroupByResult[pos].increment(_intColX[j]); + } + + String query = "SELECT SKEWNESS(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; + GroupByOperator groupByOperator = getOperator(query); + GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 2, NUM_RECORDS); + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + for (int i = 0; i < NUM_GROUPS; i++) { + PinotFourthMoment actual = (PinotFourthMoment) aggregationGroupByResult.getResultForGroupId(0, i); + checkWithPrecisionForSkew(actual, NUM_RECORDS / NUM_GROUPS, expectedGroupByResult[i].getResult()); + } + } + + @Test + public void testKurtosisAggregationOnly() { + // Compute the expected values + Kurtosis[] expectedKurt = new Kurtosis[4]; + for (int i = 0; i < 4; i++) { + expectedKurt[i] = new Kurtosis(); + } + + for (int i = 0; i < NUM_RECORDS; i++) { + expectedKurt[0].increment(_intColX[i]); + expectedKurt[1].increment(_longCol[i]); + expectedKurt[2].increment(_floatCol[i]); + expectedKurt[3].increment(_doubleColX[i]); + } + + // Compute the query + String query = + "SELECT KURTOSIS(intColumnX), KURTOSIS(longColumn), KURTOSIS(floatColumn), " + + "KURTOSIS(doubleColumnX) FROM testTable"; + AggregationOperator aggregationOperator = getOperator(query); + AggregationResultsBlock resultsBlock = aggregationOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(aggregationOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 4, NUM_RECORDS); + List aggregationResult = resultsBlock.getResults(); + + // Validate the aggregation results + checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(0), NUM_RECORDS, expectedKurt[0].getResult()); + checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(1), NUM_RECORDS, expectedKurt[1].getResult()); + checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(2), NUM_RECORDS, expectedKurt[2].getResult()); + checkWithPrecisionForKurt((PinotFourthMoment) aggregationResult.get(3), NUM_RECORDS, expectedKurt[3].getResult()); + + // Validate the response + BrokerResponseNative brokerResponse = getBrokerResponse(query); + brokerResponse.getResultTable(); + Object[] results = brokerResponse.getResultTable().getRows().get(0); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[0], expectedKurt[0].getResult(), RELATIVE_EPSILON)); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[1], expectedKurt[1].getResult(), RELATIVE_EPSILON)); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[2], expectedKurt[2].getResult(), RELATIVE_EPSILON)); + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[3], expectedKurt[3].getResult(), RELATIVE_EPSILON)); + + // Validate the response for a query with a filter + query = "SELECT KURTOSIS(intColumnX) from testTable" + getFilter(); + brokerResponse = getBrokerResponse(query); + brokerResponse.getResultTable(); + results = brokerResponse.getResultTable().getRows().get(0); + Kurtosis filterExpectedKurt = new Kurtosis(); + for (int i = 0; i < NUM_RECORDS / 2; i++) { + filterExpectedKurt.increment(_intColX[i]); + } + assertTrue( + Precision.equalsWithRelativeTolerance((double) results[0], filterExpectedKurt.getResult(), RELATIVE_EPSILON)); + } + + @Test + public void testKurtosisAggregationGroupBy() { + // Compute expected group results + Kurtosis[] expectedGroupByResult = new Kurtosis[NUM_GROUPS]; + + for (int i = 0; i < NUM_GROUPS; i++) { + expectedGroupByResult[i] = new Kurtosis(); + } + for (int j = 0; j < NUM_RECORDS; j++) { + int pos = j / (NUM_RECORDS / NUM_GROUPS); + expectedGroupByResult[pos].increment(_intColX[j]); + } + + String query = "SELECT KURTOSIS(intColumnX) FROM testTable GROUP BY groupByColumn ORDER BY groupByColumn"; + GroupByOperator groupByOperator = getOperator(query); + GroupByResultsBlock resultsBlock = groupByOperator.nextBlock(); + QueriesTestUtils.testInnerSegmentExecutionStatistics(groupByOperator.getExecutionStatistics(), NUM_RECORDS, 0, + NUM_RECORDS * 2, NUM_RECORDS); + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + assertNotNull(aggregationGroupByResult); + for (int i = 0; i < NUM_GROUPS; i++) { + PinotFourthMoment actual = (PinotFourthMoment) aggregationGroupByResult.getResultForGroupId(0, i); + checkWithPrecisionForKurt(actual, NUM_RECORDS / NUM_GROUPS, expectedGroupByResult[i].getResult()); + } + } + private void checkWithPrecisionForCovariance(CovarianceTuple tuple, double sumX, double sumY, double sumXY, int count) { assertEquals(tuple.getCount(), count); @@ -766,6 +935,16 @@ private void checkWithPrecisionForStandardDeviation(VarianceTuple tuple, int exp } } + private void checkWithPrecisionForSkew(PinotFourthMoment m4, int expectedCount, double expectedSkew) { + assertEquals(m4.getN(), expectedCount); + assertTrue(Precision.equalsWithRelativeTolerance(m4.skew(), expectedSkew, RELATIVE_EPSILON)); + } + + private void checkWithPrecisionForKurt(PinotFourthMoment m4, int expectedCount, double expectedSkew) { + assertEquals(m4.getN(), expectedCount); + assertTrue(Precision.equalsWithRelativeTolerance(m4.kurtosis(), expectedSkew, RELATIVE_EPSILON)); + } + private double computeVariancePop(VarianceTuple varianceTuple) { return varianceTuple.getM2() / varianceTuple.getCount(); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java new file mode 100644 index 000000000000..f5ebb46a7676 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/customobject/PinotFourthMoment.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.customobject; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import org.apache.commons.math.stat.descriptive.moment.FourthMoment; +import org.apache.commons.math.stat.descriptive.moment.Kurtosis; +import org.apache.commons.math.stat.descriptive.moment.Skewness; + + +/** + * A {@link Comparable} implementation of the + * Fourth Statistical Moment that uses the apache commons algorithm for computing it in + * one pass. It additionally supports serialization and deserialization methods, which is helpful + * for combining moments across servers. + * + *

The commons implementation does not support parallel-computation, support for which is added + * in the {@link #combine(PinotFourthMoment)} method inspired by Presto's implementation. + * + *

+ * Also See: Presto's Implementation
+ * 
+ */ +public class PinotFourthMoment extends FourthMoment implements Comparable { + + private static final Comparator COMPARATOR = Comparator.naturalOrder() + .thenComparingLong(x -> x.n) + .thenComparingDouble(x -> x.m1) + .thenComparingDouble(x -> x.m2) + .thenComparingDouble(x -> x.m3) + .thenComparingDouble(x -> x.m4); + + public void combine(PinotFourthMoment other) { + combine(other.n, other.m1, other.m2, other.m3, other.m4); + } + + public void combine(long bN, double bM1, double bM2, double bM3, double bM4) { + if (bN == 0) { + return; + } else if (n == 0) { + n = bN; + m1 = bM1; + m2 = bM2; + m3 = bM3; + m4 = bM4; + return; + } + + long aN = n; + double aM1 = m1; + double aM2 = m2; + double aM3 = m3; + double aM4 = m4; + + long n = aN + bN; + double m1 = (aN * aM1 + bN * bM1) / n; + + double delta = bM1 - aM1; + double delta2 = delta * delta; + double m2 = aM2 + bM2 + delta2 * aN * bN / n; + + double delta3 = delta2 * delta; + double m3 = aM3 + bM3 + + delta3 * aN * bN * (aN - bN) / (n * n) + + 3d * delta * (aN * bM2 - bN * aM2) / n; + + double delta4 = delta3 * delta; + double n3 = ((double) n) * n * n; // avoid overflow + double m4 = aM4 + bM4 + + delta4 * aN * bN * (aN * aN - aN * bN + bN * bN) / (n3) + + 6.0 * delta2 * (aN * aN * bM2 + bN * bN * aM2) / (n * n) + + 4d * delta * (aN * bM3 - bN * aM3) / n; + + this.n = n; + this.m1 = m1; + this.m2 = m2; + this.m3 = m3; + this.m4 = m4; + } + + public double skew() { + return new Skewness(this).getResult(); + } + + public double kurtosis() { + return new Kurtosis(this).getResult(); + } + + public byte[] serialize() { + ByteBuffer buff = ByteBuffer.allocate(Long.BYTES + Double.BYTES * 4); + buff.putLong(n) + .putDouble(m1) + .putDouble(m2) + .putDouble(m3) + .putDouble(m4); + return buff.array(); + } + + public static PinotFourthMoment fromBytes(byte[] bytes) { + return fromBytes(ByteBuffer.wrap(bytes)); + } + + public static PinotFourthMoment fromBytes(ByteBuffer buff) { + PinotFourthMoment moment = new PinotFourthMoment(); + moment.n = buff.getLong(); + moment.m1 = buff.getDouble(); + moment.m2 = buff.getDouble(); + moment.m3 = buff.getDouble(); + moment.m4 = buff.getDouble(); + return moment; + } + + @Override + public int compareTo(PinotFourthMoment o) { + return COMPARATOR.compare(this, o); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java new file mode 100644 index 000000000000..727bea7d8773 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/customobject/PinotFourthMomentTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.segment.local.customobject; + +import java.util.Random; +import java.util.stream.IntStream; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class PinotFourthMomentTest { + + @Test + public void shouldCombineMoments() { + // Given: + Random r = new Random(); + double[] xs = IntStream.generate(r::nextInt) + .limit(100) + .mapToDouble(i -> (double) i) + .toArray(); + + PinotFourthMoment a = new PinotFourthMoment(); + PinotFourthMoment b = new PinotFourthMoment(); + PinotFourthMoment c = new PinotFourthMoment(); + + // When: + for (int i = 0; i < xs.length; i++) { + a.increment(xs[i]); + (i < xs.length / 2 ? b : c).increment(xs[i]); + } + b.combine(c); + + // Then: + assertEquals(b.skew(), a.skew(), .01); + assertEquals(b.kurtosis(), a.kurtosis(), .01); + } + + @Test + public void shouldCombineLeftEmptyMoments() { + // Given: + Random r = new Random(); + double[] xs = IntStream.generate(r::nextInt) + .limit(100) + .mapToDouble(i -> (double) i) + .toArray(); + + PinotFourthMoment a = new PinotFourthMoment(); + PinotFourthMoment b = new PinotFourthMoment(); + + // When: + for (double x : xs) { + a.increment(x); + } + + b.combine(a); + + // Then: + assertEquals(b.kurtosis(), a.kurtosis(), .01); + } + + @Test + public void shouldCombineRightEmptyMoments() { + // Given: + Random r = new Random(); + double[] xs = IntStream.generate(r::nextInt) + .limit(100) + .mapToDouble(i -> (double) i) + .toArray(); + + PinotFourthMoment a = new PinotFourthMoment(); + PinotFourthMoment b = new PinotFourthMoment(); + + // When: + for (double x : xs) { + a.increment(x); + } + + double kurtosisBeforeCombine = a.kurtosis(); + a.combine(b); + + // Then: + assertEquals(a.kurtosis(), kurtosisBeforeCombine, .01); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java index 584c99a7b2e8..beaac5718d8a 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java @@ -63,6 +63,8 @@ public enum AggregationFunctionType { VARSAMP("varSamp"), STDDEVPOP("stdDevPop"), STDDEVSAMP("stdDevSamp"), + SKEWNESS("skewness"), + KURTOSIS("kurtosis"), // Geo aggregation functions STUNION("STUnion"),