-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Realtime pre-aggregation for Distinct Count HLL & Big Decimal #10926
Realtime pre-aggregation for Distinct Count HLL & Big Decimal #10926
Conversation
f956879
to
2ba6ce6
Compare
Codecov Report
@@ Coverage Diff @@
## master #10926 +/- ##
==========================================
- Coverage 0.11% 0.11% -0.01%
==========================================
Files 2218 2225 +7
Lines 119138 119440 +302
Branches 18022 18092 +70
==========================================
Hits 137 137
- Misses 118981 119283 +302
Partials 20 20
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 90 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is awesome!
Currently we don't have null
handling for aggregate metrics. nullHandlingEnabled
is modeled as a query option, and aggregate metrics rely on the behavior of default null value. If we want to handle null
(basically ignore null
for aggregate metrics), we should introduce a config for that, and I think that can be done as a separate PR
pinot-core/src/test/java/org/apache/pinot/core/common/ObjectSerDeUtilsTest.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pinot/segment/local/aggregator/DistinctCountHLLValueAggregator.java
Outdated
Show resolved
Hide resolved
if (bytes == null || bytes.length == 0) { | ||
return new HyperLogLog(_log2m); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? We don't have this special handling in other functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deserializeAggregatedValue is often called in getInitialAggregatedValue of some of the implementations (not just HLL or sum precision), one example is in AvgValueAggregator when the input is of type bytes, so to cover that case, I also do it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow. Even if it is used in getInitialAggregatedValue()
, the input should never be null or empty. Are we trying to handle invalid input data (e.g. empty byte array)? If so, the handling should be added to getInitialAggregatedValue()
and applyRawValue()
instead of here
@@ -37,11 +37,17 @@ public DataType getAggregatedValueType() { | |||
|
|||
@Override | |||
public Double getInitialAggregatedValue(Number rawValue) { | |||
if (rawValue == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever be null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be if the field is null on some of the incoming stream messages. You could always guard against this by filtering out the messages during ingestion, but I think returning 0.0 here as the default should be harmless since this is the sum aggregator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the input should never be null
(it should have already been filled with default value). My concern is that we are adding null
handling to only this aggregation but not others. In order to completely support null
input, we need to allow null value in, and annotate the input value as @Nullable
and support it for all aggregations. That is not in the scope of this PR, so suggest doing it separately
@Jackie-Jiang, addressed your comments, please review again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly good
String log2mLit = arguments.get(1).getLiteral().getStringValue(); | ||
Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m argument must be a numeric literal"); | ||
|
||
_log2m = Integer.parseInt(log2mLit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor)
String log2mLit = arguments.get(1).getLiteral().getStringValue(); | |
Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m argument must be a numeric literal"); | |
_log2m = Integer.parseInt(log2mLit); | |
_log2m = arguments.get(1).getLiteral().getIntValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this remove the check if its numeric though?
Preconditions.checkState(StringUtils.isNumeric(log2mLit), "log2m argument must be a numeric literal"); | ||
|
||
_log2m = Integer.parseInt(log2mLit); | ||
_log2mByteSize = (new HyperLogLog(_log2m)).getBytes().length; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a util to get the byte size without serializing:
byteSize = (RegisterSet.getSizeForCount(1 << log2m) + 2) * Integer.BYTES
if (bytes == null || bytes.length == 0) { | ||
return new HyperLogLog(_log2m); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow. Even if it is used in getInitialAggregatedValue()
, the input should never be null or empty. Are we trying to handle invalid input data (e.g. empty byte array)? If so, the handling should be added to getInitialAggregatedValue()
and applyRawValue()
instead of here
} | ||
|
||
/* | ||
Aggregate with a optimal maximum precision in mind. Scale is always only 1 32-bit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(code format) We usually indent (add 2 spaces) the block comment
String precision = arguments.get(1).getLiteral().getStringValue(); | ||
Preconditions.checkState(StringUtils.isNumeric(precision), "precision must be a numeric literal"); | ||
|
||
_fixedSize = BigDecimalUtils.byteSizeForFixedPrecision(Integer.parseInt(precision)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String precision = arguments.get(1).getLiteral().getStringValue(); | |
Preconditions.checkState(StringUtils.isNumeric(precision), "precision must be a numeric literal"); | |
_fixedSize = BigDecimalUtils.byteSizeForFixedPrecision(Integer.parseInt(precision)); | |
_fixedSize = BigDecimalUtils.byteSizeForFixedPrecision(arguments.get(1).getLiteral().getIntValue()); |
@@ -142,7 +143,8 @@ static class Record { | |||
for (AggregationFunctionColumnPair functionColumnPair : functionColumnPairs) { | |||
_metrics[index] = functionColumnPair.toColumnName(); | |||
_functionColumnPairs[index] = functionColumnPair; | |||
_valueAggregators[index] = ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType()); | |||
_valueAggregators[index] = | |||
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.EMPTY_LIST); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit)
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.EMPTY_LIST); | |
ValueAggregatorFactory.getValueAggregator(functionColumnPair.getFunctionType(), Collections.emptyList()); |
|
||
List<ExpressionContext> arguments = functionContext.getArguments(); | ||
|
||
if (("distinctcounthll".equals(functionContext.getFunctionName())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use canonical name (removing underscore). Currently if the function name is distinct_count_hll
or sum_precision
it will fail
...ava/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteSVMutableForwardIndex.java
Show resolved
Hide resolved
@@ -269,13 +272,14 @@ public MutableIndex createMutableIndex(MutableIndexContext context, ForwardIndex | |||
String column = context.getFieldSpec().getName(); | |||
String segmentName = context.getSegmentName(); | |||
FieldSpec.DataType storedType = context.getFieldSpec().getDataType().getStoredType(); | |||
int maxLength = context.getFieldSpec().getMaxLength(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(MAJOR) I don't think this is the correct way to pass this information. We can probably add the fixed length info into the MutableIndexContext
to avoid modifying the field spec
@@ -621,6 +631,10 @@ private void addNewRow(int docId, GenericRow row) { | |||
case DOUBLE: | |||
forwardIndex.add(((Number) value).doubleValue(), -1, docId); | |||
break; | |||
case BIG_DECIMAL: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above comment no longer apply.
We should probably add some comment about using byte[] to support BIG_DECIMAL
. It works because BIG_DECIMAL
is actually stored as byte[] underlying
aa406c4
to
2b42235
Compare
f210da8
to
38160a4
Compare
@Jackie-Jiang pls re-review PR, failed build looks like flaky test |
- Avoid HLL dependency in pinot-spi - Simplify byte size computation for HLL - Fix the table config validation logic. We allow type mismatch as long as it can be converted (e.g. numbers are compatible) - Cleanup and reformat
dfa388e
to
5e75cf1
Compare
Feature description
This PR expands realtime aggregation to include supporting DistinctCountHLL & SUM_PRECISION (supporting Big Decimal).
DISTINCTCOUNTHLL, here's an example config in the realtime table:
Here,
customer
is the field that we want to count uniquely, in this case it is a string where customer is the id of the customer. 12 is the log2m which defines the accuracy in the HLL algorithm. The output field,distinctcounthll_customer
is the bytes form of that HLL object, when then can be queried as normal using DISTINCTCOUNTHLL or DISTINCTCOUNTHLLRAW query operators.Here is the schema for above:
SUMPRECISION, for big decimal pre-aggregation, example:
Here,
parsed_amount
is the input used to create a big decimal object of precision 38. Precision 38 is used to compute a bytes size upper limit, and this will be the maximum size, and it must be defined). While in theory big decimal's can be unlimited, here we specify a maximum size upfront. Larger big decimal's won't be supported. sum_precision_parsed_amount is the output that can be queried using SUMPRECISION query operator.Here is the schema for the above:
Both of these features underlying used the FixedByteSVMutableForwardIndex.
Testing
I have added unit tests covering the FixedByteSVMutableForwardIndex, as well as the serialization functions added & tests covering aggregation testing by writing data to the index and trying to read it back.