Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime pre-aggregation for Distinct Count HLL & Big Decimal #10926

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,10 @@
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.startree.v2;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.util.Collections;
import java.util.Random;
import org.apache.pinot.segment.local.aggregator.DistinctCountHLLValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
Expand All @@ -31,7 +32,7 @@ public class DistinctCountHLLStarTreeV2Test extends BaseStarTreeV2Test<Object, H

@Override
ValueAggregator<Object, HyperLogLog> getValueAggregator() {
return new DistinctCountHLLValueAggregator();
return new DistinctCountHLLValueAggregator(Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.startree.v2;

import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.util.Collections;
import java.util.Random;
import org.apache.pinot.core.common.ObjectSerDeUtils;
import org.apache.pinot.segment.local.aggregator.DistinctCountHLLValueAggregator;
Expand All @@ -34,7 +35,7 @@ public class PreAggregatedDistinctCountHLLStarTreeV2Test extends BaseStarTreeV2T

@Override
ValueAggregator<Object, HyperLogLog> getValueAggregator() {
return new DistinctCountHLLValueAggregator();
return new DistinctCountHLLValueAggregator(Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.startree.v2;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.Random;
import org.apache.pinot.segment.local.aggregator.SumPrecisionValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
Expand All @@ -31,7 +32,7 @@ public class SumPrecisionStarTreeV2Test extends BaseStarTreeV2Test<Object, BigDe

@Override
ValueAggregator<Object, BigDecimal> getValueAggregator() {
return new SumPrecisionValueAggregator();
return new SumPrecisionValueAggregator(Collections.emptyList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,32 @@

import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLog;
import java.util.List;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.local.utils.CustomSerDeUtils;
import org.apache.pinot.segment.local.utils.HyperLogLogUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.CommonConstants;


public class DistinctCountHLLValueAggregator implements ValueAggregator<Object, HyperLogLog> {
public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;
private static final int DEFAULT_LOG2M_BYTE_SIZE = 180;

private final int _log2m;

// Byte size won't change once we get the initial aggregated value
private int _maxByteSize;

public DistinctCountHLLValueAggregator(List<ExpressionContext> arguments) {
// length 1 means we use the default _log2m of 8
if (arguments.size() <= 1) {
_log2m = CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M;
} else {
_log2m = arguments.get(1).getLiteral().getIntValue();
}
}

@Override
public AggregationFunctionType getAggregationType() {
return AggregationFunctionType.DISTINCTCOUNTHLL;
Expand All @@ -49,12 +62,11 @@ public HyperLogLog getInitialAggregatedValue(Object rawValue) {
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
initialValue = deserializeAggregatedValue(bytes);
_maxByteSize = Math.max(_maxByteSize, bytes.length);
_maxByteSize = bytes.length;
} else {
// TODO: Handle configurable log2m for StarTreeBuilder
initialValue = new HyperLogLog(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M);
initialValue = new HyperLogLog(_log2m);
initialValue.offer(rawValue);
_maxByteSize = Math.max(_maxByteSize, DEFAULT_LOG2M_BYTE_SIZE);
_maxByteSize = HyperLogLogUtils.byteSize(initialValue);
}
return initialValue;
}
Expand Down Expand Up @@ -90,7 +102,9 @@ public HyperLogLog cloneAggregatedValue(HyperLogLog value) {

@Override
public int getMaxAggregatedValueByteSize() {
return _maxByteSize;
// NOTE: For aggregated metrics, initial aggregated value might have not been generated. Returns the byte size
// based on log2m.
return _maxByteSize > 0 ? _maxByteSize : HyperLogLogUtils.byteSize(_log2m);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
*/
package org.apache.pinot.segment.local.aggregator;

import com.google.common.base.Preconditions;
import java.math.BigDecimal;
import java.util.List;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.BigDecimalUtils;
Expand All @@ -27,8 +30,23 @@
public class SumPrecisionValueAggregator implements ValueAggregator<Object, BigDecimal> {
public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;

private final int _fixedSize;

private int _maxByteSize;

/**
* Optional second argument is the maximum precision. Scale is always stored as 2 bytes. During query time, the
* optional scale parameter can be provided, but during ingestion, we don't limit it.
*/
public SumPrecisionValueAggregator(List<ExpressionContext> arguments) {
// length 1 means we don't have any caps on maximum precision nor do we have a fixed size then
if (arguments.size() <= 1) {
_fixedSize = -1;
} else {
_fixedSize = BigDecimalUtils.byteSizeForFixedPrecision(arguments.get(1).getLiteral().getIntValue());
}
}

@Override
public AggregationFunctionType getAggregationType() {
return AggregationFunctionType.SUMPRECISION;
Expand All @@ -42,14 +60,18 @@ public DataType getAggregatedValueType() {
@Override
public BigDecimal getInitialAggregatedValue(Object rawValue) {
BigDecimal initialValue = toBigDecimal(rawValue);
_maxByteSize = Math.max(_maxByteSize, BigDecimalUtils.byteSize(initialValue));
if (_fixedSize < 0) {
_maxByteSize = Math.max(_maxByteSize, BigDecimalUtils.byteSize(initialValue));
}
return initialValue;
}

@Override
public BigDecimal applyRawValue(BigDecimal value, Object rawValue) {
value = value.add(toBigDecimal(rawValue));
_maxByteSize = Math.max(_maxByteSize, BigDecimalUtils.byteSize(value));
if (_fixedSize < 0) {
_maxByteSize = Math.max(_maxByteSize, BigDecimalUtils.byteSize(value));
}
return value;
}

Expand All @@ -66,7 +88,9 @@ private static BigDecimal toBigDecimal(Object rawValue) {
@Override
public BigDecimal applyAggregatedValue(BigDecimal value, BigDecimal aggregatedValue) {
value = value.add(aggregatedValue);
_maxByteSize = Math.max(_maxByteSize, BigDecimalUtils.byteSize(value));
if (_fixedSize < 0) {
_maxByteSize = Math.max(_maxByteSize, BigDecimalUtils.byteSize(value));
}
return value;
}

Expand All @@ -78,12 +102,14 @@ public BigDecimal cloneAggregatedValue(BigDecimal value) {

@Override
public int getMaxAggregatedValueByteSize() {
return _maxByteSize;
Preconditions.checkState(_fixedSize > 0 || _maxByteSize > 0,
"Unknown max aggregated value byte size, please provide maximum precision as the second argument");
return _fixedSize > 0 ? _fixedSize : _maxByteSize;
}

@Override
public byte[] serializeAggregatedValue(BigDecimal value) {
return BigDecimalUtils.serialize(value);
return _fixedSize > 0 ? BigDecimalUtils.serializeWithSize(value, _fixedSize) : BigDecimalUtils.serialize(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pinot.segment.local.aggregator;

import java.util.List;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;

Expand All @@ -37,7 +39,8 @@ private ValueAggregatorFactory() {
* @param aggregationType Aggregation type
* @return Value aggregator
*/
public static ValueAggregator getValueAggregator(AggregationFunctionType aggregationType) {
public static ValueAggregator getValueAggregator(AggregationFunctionType aggregationType,
List<ExpressionContext> arguments) {
switch (aggregationType) {
case COUNT:
return new CountValueAggregator();
Expand All @@ -48,7 +51,7 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
case SUM:
return new SumValueAggregator();
case SUMPRECISION:
return new SumPrecisionValueAggregator();
return new SumPrecisionValueAggregator(arguments);
case AVG:
return new AvgValueAggregator();
case MINMAXRANGE:
Expand All @@ -57,7 +60,7 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega
return new DistinctCountBitmapValueAggregator();
case DISTINCTCOUNTHLL:
case DISTINCTCOUNTRAWHLL:
return new DistinctCountHLLValueAggregator();
return new DistinctCountHLLValueAggregator(arguments);
case PERCENTILEEST:
case PERCENTILERAWEST:
return new PercentileEstValueAggregator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,15 +251,27 @@ public boolean isMutableSegment() {
metricsAggregators = getMetricsAggregators(config);
}

Set<IndexType> specialIndexes = Sets.newHashSet(
StandardIndexes.dictionary(), // dictionaries implement other contract
StandardIndexes.nullValueVector()); // null value vector implement other contract
Set<IndexType> specialIndexes =
Sets.newHashSet(StandardIndexes.dictionary(), // dictionaries implement other contract
StandardIndexes.nullValueVector()); // null value vector implement other contract

// Initialize for each column
for (FieldSpec fieldSpec : _physicalFieldSpecs) {
String column = fieldSpec.getName();
FieldIndexConfigs indexConfigs = Optional.ofNullable(config.getIndexConfigByCol().get(column))
.orElse(FieldIndexConfigs.EMPTY);

int fixedByteSize = -1;
DataType dataType = fieldSpec.getDataType();
DataType storedType = dataType.getStoredType();
if (!storedType.isFixedWidth()) {
// For aggregated metrics, we need to store values with fixed byte size so that in-place replacement is possible
Pair<String, ValueAggregator> aggregatorPair = metricsAggregators.get(column);
if (aggregatorPair != null) {
fixedByteSize = aggregatorPair.getRight().getMaxAggregatedValueByteSize();
}
}

FieldIndexConfigs indexConfigs =
Optional.ofNullable(config.getIndexConfigByCol().get(column)).orElse(FieldIndexConfigs.EMPTY);
boolean isDictionary = !isNoDictionaryColumn(indexConfigs, fieldSpec, column);
MutableIndexContext context =
MutableIndexContext.builder().withFieldSpec(fieldSpec).withMemoryManager(_memoryManager)
Expand All @@ -268,7 +280,7 @@ public boolean isMutableSegment() {
.withEstimatedColSize(_statsHistory.getEstimatedAvgColSize(column))
.withAvgNumMultiValues(_statsHistory.getEstimatedAvgColSize(column))
.withConsumerDir(config.getConsumerDir() != null ? new File(config.getConsumerDir()) : null)
.build();
.withFixedLengthBytes(fixedByteSize).build();

// Partition info
PartitionFunction partitionFunction = null;
Expand Down Expand Up @@ -306,8 +318,7 @@ public boolean isMutableSegment() {
dictionary = null;
if (!fieldSpec.isSingleValueField()) {
// Raw MV columns
DataType dataType = fieldSpec.getDataType().getStoredType();
switch (dataType) {
switch (storedType) {
case INT:
case LONG:
case FLOAT:
Expand Down Expand Up @@ -416,15 +427,14 @@ private boolean isNoDictionaryColumn(FieldIndexConfigs indexConfigs, FieldSpec f
// if the column is part of noDictionary set from table config
if (fieldSpec instanceof DimensionFieldSpec && isAggregateMetricsEnabled() && (dataType == STRING
|| dataType == BYTES)) {
_logger.info(
"Aggregate metrics is enabled. Will create dictionary in consuming segment for column {} of type {}",
_logger.info("Aggregate metrics is enabled. Will create dictionary in consuming segment for column {} of type {}",
column, dataType);
return false;
}
// So don't create dictionary if the column (1) is member of noDictionary, and (2) is single-value or multi-value
// with a fixed-width field, and (3) doesn't have an inverted index
return (fieldSpec.isSingleValueField() || fieldSpec.getDataType().isFixedWidth())
&& indexConfigs.getConfig(StandardIndexes.inverted()).isDisabled();
return (fieldSpec.isSingleValueField() || fieldSpec.getDataType().isFixedWidth()) && indexConfigs.getConfig(
StandardIndexes.inverted()).isDisabled();
}

public SegmentPartitionConfig getSegmentPartitionConfig() {
Expand Down Expand Up @@ -603,10 +613,7 @@ private void addNewRow(int docId, GenericRow row) {

DataType dataType = fieldSpec.getDataType();
value = indexContainer._valueAggregator.getInitialAggregatedValue(value);
// aggregator value has to be numeric, but can be a different type of Number from the one expected on the column
// therefore we need to do some value changes here.
// TODO: Precision may change from one value to other, so we may need to study if this is actually what we want
// to do
// BIG_DECIMAL is actually stored as byte[] and hence can be supported here.
switch (dataType.getStoredType()) {
case INT:
forwardIndex.add(((Number) value).intValue(), -1, docId);
Expand All @@ -620,6 +627,10 @@ private void addNewRow(int docId, GenericRow row) {
case DOUBLE:
forwardIndex.add(((Number) value).doubleValue(), -1, docId);
break;
case BIG_DECIMAL:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above comment no longer apply.
We should probably add some comment about using byte[] to support BIG_DECIMAL. It works because BIG_DECIMAL is actually stored as byte[] underlying

case BYTES:
forwardIndex.add(indexContainer._valueAggregator.serializeAggregatedValue(value), -1, docId);
break;
default:
throw new UnsupportedOperationException(
"Unsupported data type: " + dataType + " for aggregation: " + column);
Expand Down Expand Up @@ -796,6 +807,11 @@ private void aggregateMetrics(GenericRow row, int docId) {
valueAggregator.getAggregatedValueType(), valueAggregator.getAggregationType(), dataType));
}
break;
case BYTES:
Object oldValue = valueAggregator.deserializeAggregatedValue(forwardIndex.getBytes(docId));
Object newValue = valueAggregator.applyRawValue(oldValue, value);
forwardIndex.setBytes(docId, valueAggregator.serializeAggregatedValue(newValue));
break;
default:
throw new UnsupportedOperationException(
String.format("Aggregation type %s of %s not supported for %s", valueAggregator.getAggregatedValueType(),
Expand Down Expand Up @@ -1198,8 +1214,8 @@ private static Map<String, Pair<String, ValueAggregator>> fromAggregateMetrics(R

Map<String, Pair<String, ValueAggregator>> columnNameToAggregator = new HashMap<>();
for (String metricName : segmentConfig.getSchema().getMetricNames()) {
columnNameToAggregator.put(metricName,
Pair.of(metricName, ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM)));
columnNameToAggregator.put(metricName, Pair.of(metricName,
ValueAggregatorFactory.getValueAggregator(AggregationFunctionType.SUM, Collections.emptyList())));
}
return columnNameToAggregator;
}
Expand All @@ -1215,18 +1231,15 @@ private static Map<String, Pair<String, ValueAggregator>> fromAggregationConfig(
Preconditions.checkState(expressionContext.getType() == ExpressionContext.Type.FUNCTION,
"aggregation function must be a function: %s", config);
FunctionContext functionContext = expressionContext.getFunction();
TableConfigUtils.validateIngestionAggregation(functionContext.getFunctionName());
Preconditions.checkState(functionContext.getArguments().size() == 1,
"aggregation function can only have one argument: %s", config);
AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());
TableConfigUtils.validateIngestionAggregation(functionType);
ExpressionContext argument = functionContext.getArguments().get(0);
Preconditions.checkState(argument.getType() == ExpressionContext.Type.IDENTIFIER,
"aggregator function argument must be a identifier: %s", config);

AggregationFunctionType functionType =
AggregationFunctionType.getAggregationFunctionType(functionContext.getFunctionName());

columnNameToAggregator.put(config.getColumnName(),
Pair.of(argument.getIdentifier(), ValueAggregatorFactory.getValueAggregator(functionType)));
columnNameToAggregator.put(config.getColumnName(), Pair.of(argument.getIdentifier(),
ValueAggregatorFactory.getValueAggregator(functionType, functionContext.getArguments())));
}

return columnNameToAggregator;
Expand Down Expand Up @@ -1290,8 +1303,8 @@ public void close() {
closeable.close();
}
} catch (Exception e) {
_logger.error("Caught exception while closing {} index for column: {}, continuing with error",
indexType, column, e);
_logger.error("Caught exception while closing {} index for column: {}, continuing with error", indexType,
column, e);
}
};

Expand Down
Loading