Skip to content

Commit

Permalink
More cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
atris committed Jan 19, 2022
1 parent be3c695 commit 7d85659
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,14 @@ public Operator<IntermediateResultsBlock> run() {
assert _queryContext.getAggregationFunctions() != null;

boolean hasFilteredPredicates = _queryContext.isHasFilteredAggregations();

Pair<FilterPlanNode, BaseFilterOperator> filterOperatorPair =
buildFilterOperator(_queryContext.getFilter());

Pair<TransformOperator,
BaseOperator<IntermediateResultsBlock>> pair =
buildOperators(filterOperatorPair.getRight(), filterOperatorPair.getLeft());

BaseOperator<IntermediateResultsBlock> aggOperator;
if (hasFilteredPredicates) {
int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();

return buildOperatorForFilteredAggregations(filterOperatorPair.getRight(), pair.getLeft(),
numTotalDocs);
aggOperator = buildFilteredAggOperator();
} else {
aggOperator = buildNonFilteredAggOperator();
}

return pair.getRight();
return aggOperator;
}

/**
Expand Down Expand Up @@ -147,64 +139,40 @@ private BaseOperator<IntermediateResultsBlock> buildOperatorForFilteredAggregati
for (Pair<FilterContext, AggregationFunction> inputPair : aggregationFunctions) {
if (inputPair.getLeft() != null) {
FilterContext currentFilterExpression = inputPair.getLeft();

/*ExpressionContext currentFilterExpression = filterableAggregationFunction
.getAssociatedExpressionContext();
if (expressionContextToAggFuncsMap.get(currentFilterExpression) != null) {
expressionContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(aggregationFunction);
continue;
}*/

if (filterContextToAggFuncsMap.get(currentFilterExpression) != null) {
filterContextToAggFuncsMap.get(currentFilterExpression).getLeft().add(inputPair.getRight());
continue;
}

Pair<FilterPlanNode, BaseFilterOperator> pair =
buildFilterOperator(currentFilterExpression);

BaseFilterOperator wrappedFilterOperator = new CombinedFilterOperator(mainPredicateFilterOperator,
pair.getRight());

Pair<TransformOperator,
BaseOperator<IntermediateResultsBlock>> innerPair =
buildOperators(wrappedFilterOperator, pair.getLeft());


// For each transform operator, associate it with the underlying expression. This allows
// fetching the relevant TransformOperator when resolving blocks during aggregation
// execution

List aggFunctionList = new ArrayList<>();

aggFunctionList.add(inputPair.getRight());

/*expressionContextToAggFuncsMap.put(currentFilterExpression,
Pair.of(aggFunctionList, innerPair.getLeft()));*/
filterContextToAggFuncsMap.put(currentFilterExpression,
Pair.of(aggFunctionList, innerPair.getLeft()));
} else {
nonFilteredAggregationFunctions.add(inputPair.getRight());
}
}

List<Pair<AggregationFunction[], TransformOperator>> aggToTransformOpList =
new ArrayList<>();

// Convert to array since FilteredAggregationOperator expects it
for (Pair<List<AggregationFunction>, TransformOperator> pair
: filterContextToAggFuncsMap.values()) {
List<AggregationFunction> aggregationFunctionList = pair.getLeft();

if (aggregationFunctionList == null) {
throw new IllegalStateException("Null aggregation list seen");
}

aggToTransformOpList.add(Pair.of(aggregationFunctionList.toArray(new AggregationFunction[0]),
pair.getRight()));
}

aggToTransformOpList.add(Pair.of(nonFilteredAggregationFunctions.toArray(new AggregationFunction[0]),
mainTransformOperator));

Expand Down Expand Up @@ -296,4 +264,35 @@ BaseOperator<IntermediateResultsBlock>> buildOperators(BaseFilterOperator filter

return Pair.of(transformOperator, aggregationOperator);
}

/**
* Builds the operator to be used for non filtered aggregations
*/
private BaseOperator<IntermediateResultsBlock> buildNonFilteredAggOperator() {
Pair<FilterPlanNode, BaseFilterOperator> filterOperatorPair =
buildFilterOperator(_queryContext.getFilter());

Pair<TransformOperator,
BaseOperator<IntermediateResultsBlock>> pair =
buildOperators(filterOperatorPair.getRight(), filterOperatorPair.getLeft());

return pair.getRight();
}

/**
* Build the operator to be used for filtered aggregations
*/
private BaseOperator<IntermediateResultsBlock> buildFilteredAggOperator() {
// Build the operator chain for the main predicate
Pair<FilterPlanNode, BaseFilterOperator> filterOperatorPair =
buildFilterOperator(_queryContext.getFilter());
Pair<TransformOperator,
BaseOperator<IntermediateResultsBlock>> pair =
buildOperators(filterOperatorPair.getRight(), filterOperatorPair.getLeft());

int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();

return buildOperatorForFilteredAggregations(filterOperatorPair.getRight(), pair.getLeft(),
numTotalDocs);
}
}
Loading

0 comments on commit 7d85659

Please sign in to comment.