Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FILTER Clauses for Aggregates #7916

Merged
merged 90 commits into from
Jan 31, 2022
Merged

Conversation

atris
Copy link
Contributor

@atris atris commented Dec 16, 2021

This PR implements support for FILTER clauses in aggregations:

SELECT SUM(COL1) FILTER(WHERE COL2 > 300), AVG(COL2) FILTER (WHERE COL2 < 50) FROM MyTable WHERE COL1 > 50;

The approach implements the swim lane design highlighted in the design document by splitting at the filter operator. The implementation gets the filter block for main predicate and each filter predicate, ANDs them together and returns a combined filter operator.

The main predicate is scanned only once and reused for all filter clauses.
The implementation allows each filter swim lane to use any available indices independently.

If two or more filter clauses have the same predicate, the result will be computed only once and fed to each of the aggregates.

https://docs.google.com/document/d/1ZM-2c0jJkbeJ61m8sJF0qj19t5UYLhnTFvIAz-HCJmk/edit?usp=sharing

Performance benchmark:

3 warm up iterations per run, 5 runs in total. Data set size -- 1.5 million documents. Apple M1 Pro, 32GB RAM

X axis represents number of iterations and Y axis represents latency in MS.

FILTER query, compared to its equivalent CASE query, is 120-140% faster on average.

image

GROUP BY is not supported yet and will be done in a follow up PR

Copy link
Member

@richardstartin richardstartin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code could be more concise in places and this would make the logic easier to follow.

@codecov-commenter
Copy link

codecov-commenter commented Dec 16, 2021

Codecov Report

Merging #7916 (bd19945) into master (0fe7ef8) will decrease coverage by 7.09%.
The diff coverage is 88.16%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #7916      +/-   ##
============================================
- Coverage     71.24%   64.15%   -7.10%     
+ Complexity     4262     4180      -82     
============================================
  Files          1607     1602       -5     
  Lines         83409    83206     -203     
  Branches      12458    12441      -17     
============================================
- Hits          59426    53380    -6046     
- Misses        19941    25979    +6038     
+ Partials       4042     3847     -195     
Flag Coverage Δ
integration1 28.97% <21.30%> (+<0.01%) ⬆️
integration2 27.63% <21.30%> (-0.04%) ⬇️
unittests1 67.97% <88.16%> (+0.10%) ⬆️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...apache/pinot/core/operator/blocks/FilterBlock.java 50.00% <50.00%> (-7.15%) ⬇️
.../pinot/core/operator/docidsets/BitmapDocIdSet.java 62.50% <60.00%> (-37.50%) ⬇️
...t/core/operator/filter/CombinedFilterOperator.java 70.00% <70.00%> (ø)
...t/core/operator/docidsets/FilterBlockDocIdSet.java 72.72% <72.72%> (ø)
...inot/core/query/reduce/PostAggregationHandler.java 91.86% <87.50%> (-0.45%) ⬇️
...rg/apache/pinot/core/plan/AggregationPlanNode.java 90.90% <88.09%> (-2.08%) ⬇️
...re/operator/query/FilteredAggregationOperator.java 90.00% <90.00%> (ø)
...pinot/core/query/request/context/QueryContext.java 97.58% <97.77%> (-0.33%) ⬇️
...re/operator/docidsets/RangelessBitmapDocIdSet.java 100.00% <100.00%> (ø)
...ava/org/apache/pinot/core/plan/FilterPlanNode.java 89.21% <100.00%> (+2.34%) ⬆️
... and 225 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0fe7ef8...bd19945. Read the comment docs.

@@ -31,16 +32,29 @@

public class StarTreeDocIdSetPlanNode implements PlanNode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I am seeing if the filterOperator is present, the existing implementation is completely overridden (new constructor and if statement in run method) to the point that the old code isn't being touched at all. I am wondering if it will be better to create a new class (for example StarTreeFilteredDocIdSetPlanNode) and doing that will also avoid the null checks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is pretty brief, and I would honestly avoid adding new plan nodes unless there is a major functionality that is different.

@MrNeocore
Copy link

Thanks you @atris !

@atris atris closed this Dec 24, 2021
@atris
Copy link
Contributor Author

atris commented Dec 24, 2021

@Jackie-Jiang is working on a parallel implementation, so closing this PR to avoid conflict

@atris atris reopened this Jan 4, 2022
@atris
Copy link
Contributor Author

atris commented Jan 4, 2022

Discussed with @Jackie-Jiang and we will be convening on this PR itself, so reviving it. Sorry for the confusion.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall logic looks good. Let's try to further clean up the code

@atris atris force-pushed the hack_hack_filter_split branch from 6150f87 to 7d85659 Compare January 19, 2022 10:00
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly good. Please reformat the changes using the Pinot Style (you might want to checkout master to import the latest checkstyle settings)

@@ -57,6 +57,7 @@ public StarTreeTransformPlanNode(StarTreeV2 starTreeV2,
_groupByExpressions = Collections.emptyList();
groupByColumns = null;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's revert this file since it is not relevant

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();
BaseOperator<IntermediateResultsBlock> aggOperator;
if (hasFilteredPredicates) {
aggOperator = buildFilteredAggOperator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(minor) this part can be more concise by directly return instead of putting the operator in an local variable

* @param numTotalDocs Number of total docs
*/
private BaseOperator<IntermediateResultsBlock> buildOperatorForFilteredAggregations(
BaseFilterOperator mainPredicateFilterOperator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The format still doesn't align with the pinot style

/**
* Builds the operator to be used for non filtered aggregations
*/
private BaseOperator<IntermediateResultsBlock> buildNonFilteredAggOperator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is to move the current code into this method, and implement buildFilteredAggOperator() separately. The reason being:

  1. The metadata/dictionary based operator and star-tree does not apply to the filtered aggregation
  2. Sharing buildOperators() method can bring extra overhead to non-filtered aggregations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -119,11 +122,13 @@ private QueryContext(String tableName, List<ExpressionContext> selectExpressions
@Nullable FilterContext filter, @Nullable List<ExpressionContext> groupByExpressions,
@Nullable FilterContext havingFilter, @Nullable List<OrderByExpressionContext> orderByExpressions, int limit,
int offset, Map<String, String> queryOptions, @Nullable Map<String, String> debugOptions,
BrokerRequest brokerRequest) {
BrokerRequest brokerRequest, boolean hasFilteredAggregations,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasFilteredAggregations should not be set through the constructor. It is updated in generateAggregationFunctions()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed.

@@ -350,6 +381,7 @@ public String toString() {
private List<ExpressionContext> _selectExpressions;
private List<String> _aliasList;
private FilterContext _filter;
private ExpressionContext _filterExpression;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in the Builder is not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@@ -441,76 +480,106 @@ public QueryContext build() {
*/
private void generateAggregationFunctions(QueryContext queryContext) {
List<AggregationFunction> aggregationFunctions = new ArrayList<>();
List<Pair<AggregationFunction, FilterContext>> filteredAggregationFunctions = new ArrayList<>();
List<Pair<FilterContext, AggregationFunction>> aggregationFunctionsWithMetadata = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename the variable


// Add aggregation functions in the SELECT clause
// NOTE: DO NOT deduplicate the aggregation functions in the SELECT clause because that involves protocol change.
List<FunctionContext> aggregationsInSelect = new ArrayList<>();
List<Pair<FunctionContext, FilterContext>> filteredAggregations = new ArrayList<>();
List<Pair<Pair<FilterContext, ExpressionContext>, FunctionContext>> aggregationsInSelect = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to keep ExpressionContext here. List<FunctionContext, FilterContext> should be enough for the following computations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@SuppressWarnings("rawtypes")
public class FilteredAggregationOperator extends BaseOperator<IntermediateResultsBlock> {
private static final String OPERATOR_NAME = "FilteredAggregationOperator";
private static final String EXPLAIN_NAME = "FILTERED_AGGREGATE";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To match the naming pattern in AggregateGroupByOperator (and other Aggregation*Operator classes), the EXPLAIN_NAME should be set to AGGREGATE_FILTERED.


@Override
public List<Operator> getChildOperators() {
return _aggFunctionsWithTransformOperator.stream().map(Pair::getRight).collect(Collectors.toList());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless this has recently changed, I think stream api usage is not consistent with Pinot coding convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen a coding convention mentioning the same, yet. Is this documented somewhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unaware of such a convention and have seen plenty of code using the streams API for performance non-critical operations (like this one) recently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang Can you please clarify if stream api usage applies?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid using stream api for performance critical operations. This one is at query path, but might not be that performance critical (only called once). Using regular api could give slightly better performance, but IMO both way is okay

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @richardstartin -- we only need to worry about streams when the code is invoked in a tight loop for multiple iterations -- none of which is applicable in this specific case

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Only minor and code format comments. Please apply the pinot format and auto-reformat the changed files

* @param numTotalDocs Number of total docs
*/
private BaseOperator<IntermediateResultsBlock> buildOperatorForFilteredAggregations(
BaseFilterOperator mainPredicateFilterOperator,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(code format) Please apply the pinot code format and use it to auto-reformat this file. Several changes do not comply with the format

@atris atris merged commit 6a82102 into apache:master Jan 31, 2022
@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Mar 10, 2022
@Jackie-Jiang
Copy link
Contributor

@atris This is a great feature. Could you please add some release notes to the PR description which we can refer to when cutting the next release?

@MrNeocore
Copy link

MrNeocore commented Apr 7, 2022

Is IN_SUBQUERY inside FILTER supported ?

For example:

SELECT SUM(value) FILTER(WHERE IN_SUBQUERY(entityId, 'SELECT ID_SET(entityId) FROM other_table WHERE cond = <thing>') = 1) FROM table

I can't seem to make it work, but maybe I'm mistyping something
-> Unsupported function: insubquery not found

@Jackie-Jiang
Copy link
Contributor

No it is not supported. For the example you give, SELECT SUM(value) WHERE IN_SUBQUERY(entityId, 'SELECT ID_SET(entityId) FROM other_table WHERE cond = <thing>') = 1 FROM table should work

@MrNeocore
Copy link

MrNeocore commented Apr 9, 2022

Thanks for the confirmation @Jackie-Jiang

Yes that's what we're using right now, but we've got a use case where we may have hundreds of sub query entities, which are currently translated into hundreds of Pinot queries so I was looking for a way to improve that :)

@atris
Copy link
Contributor Author

atris commented Apr 9, 2022 via email

@MrNeocore
Copy link

Thanks for giving it a shot @atris

@kishoreg
Copy link
Member

@atris did we add this to our docs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants