Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow using DISTINCTCOUNTTHETASKETCH with filter arguments in the multi-stage query engine #14285

Merged

Conversation

yashmayya
Copy link
Collaborator

  • Queries with an aggregation like DISTINCTCOUNTTHETASKETCH(trip_distance, 'nominalEntries=4096', 'VendorID=1', 'VendorID=2', 'SET_INTERSECT($1, $2)') currently fail with the following error on the multi-stage query engine: Invalid number of arguments to function 'DISTINCTCOUNTTHETASKETCH'. Was expecting 1 arguments.
  • This is because the Calcite operand type checker for the function was defined as OperandTypes.family(List.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER) with the second parameter being optional. However, the aggregation function can actually take any number of arguments greater than 1, see
    /**
    * The {@code DistinctCountThetaSketchAggregationFunction} can be used in 2 modes:
    * <ul>
    * <li>
    * Simple union without post-aggregation (1 or 2 arguments): main expression to aggregate on, optional theta-sketch
    * parameters
    * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col)
    * </li>
    * <li>
    * Union with post-aggregation (at least 4 arguments): main expression to aggregate on, theta-sketch parameters,
    * filter(s), post-aggregation expression
    * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, '', 'dimName=''gender'' AND dimValue=''male''',
    * 'dimName=''course'' AND dimValue=''math''', 'SET_INTERSECT($1,$2)')
    * </li>
    * </ul>
    * Currently, there are 3 parameters to the function:
    * <ul>
    * <li>
    * nominalEntries: The nominal entries used to create the sketch. (Default 4096)
    * samplingProbability: Sets the upfront uniform sampling probability, p. (Default 1.0)
    * accumulatorThreshold: How many sketches should be kept in memory before merging. (Default 2)
    * </li>
    * </ul>
    * <p>E.g. DISTINCT_COUNT_THETA_SKETCH(col, 'nominalEntries=8192')
    */
    @SuppressWarnings({"rawtypes", "unchecked"})
    public class DistinctCountThetaSketchAggregationFunction
    extends BaseSingleInputAggregationFunction<List<ThetaSketchAccumulator>, Comparable> {
    private static final String SET_UNION = "setunion";
    private static final String SET_INTERSECT = "setintersect";
    private static final String SET_DIFF = "setdiff";
    private static final String DEFAULT_SKETCH_IDENTIFIER = "$0";
    private static final int DEFAULT_ACCUMULATOR_THRESHOLD = 2;
    private final List<ExpressionContext> _inputExpressions;
    private final boolean _includeDefaultSketch;
    private final List<FilterEvaluator> _filterEvaluators;
    private final ExpressionContext _postAggregationExpression;
    private final UpdateSketchBuilder _updateSketchBuilder = new UpdateSketchBuilder();
    private int _nominalEntries = ThetaUtil.DEFAULT_NOMINAL_ENTRIES;
    protected final SetOperationBuilder _setOperationBuilder = new SetOperationBuilder();
    protected int _accumulatorThreshold = DEFAULT_ACCUMULATOR_THRESHOLD;
    public DistinctCountThetaSketchAggregationFunction(List<ExpressionContext> arguments) {
    super(arguments.get(0));
    // Initialize the UpdateSketchBuilder and SetOperationBuilder with the parameters
    int numArguments = arguments.size();
    if (numArguments > 1) {
    ExpressionContext paramsExpression = arguments.get(1);
    Preconditions.checkArgument(paramsExpression.getType() == ExpressionContext.Type.LITERAL,
    "Second argument of DISTINCT_COUNT_THETA_SKETCH aggregation function must be literal (parameters)");
    Parameters parameters = new Parameters(paramsExpression.getLiteral().getStringValue());
    // Allows the user to trade-off memory usage for merge CPU; higher values use more memory
    _accumulatorThreshold = parameters.getAccumulatorThreshold();
    // Nominal entries controls sketch accuracy and size
    _nominalEntries = parameters.getNominalEntries();
    _updateSketchBuilder.setNominalEntries(_nominalEntries);
    _setOperationBuilder.setNominalEntries(_nominalEntries);
    // Sampling probability sets the initial value of Theta, defaults to 1.0
    float p = parameters.getSamplingProbability();
    _setOperationBuilder.setP(p);
    _updateSketchBuilder.setP(p);
    }
    if (numArguments < 4) {
    // Simple union without post-aggregation
    _inputExpressions = Collections.singletonList(_expression);
    _includeDefaultSketch = true;
    _filterEvaluators = Collections.emptyList();
    _postAggregationExpression = ExpressionContext.forIdentifier(DEFAULT_SKETCH_IDENTIFIER);
    } else {
    // Union with post-aggregation
    // Input expressions should include the main expression and the lhs of the predicates in the filters
    _inputExpressions = new ArrayList<>();
    _inputExpressions.add(_expression);
    Map<ExpressionContext, Integer> expressionIndexMap = new HashMap<>();
    expressionIndexMap.put(_expression, 0);
    // Process the filter expressions
    _filterEvaluators = new ArrayList<>(numArguments - 3);
    for (int i = 2; i < numArguments - 1; i++) {
    ExpressionContext filterExpression = arguments.get(i);
    Preconditions.checkArgument(filterExpression.getType() == ExpressionContext.Type.LITERAL,
    "Third to second last argument of DISTINCT_COUNT_THETA_SKETCH aggregation function must be literal "
    + "(filter expression)");
    FilterContext filter = RequestContextUtils.getFilter(
    CalciteSqlParser.compileToExpression(filterExpression.getLiteral().getStringValue()));
    Preconditions.checkArgument(!filter.isConstant(), "Filter must not be constant: %s", filter);
    // NOTE: Collect expressions before constructing the FilterInfo so that expressionIndexMap always include the
    // expressions in the filter.
    collectExpressions(filter, _inputExpressions, expressionIndexMap);
    _filterEvaluators.add(getFilterEvaluator(filter, expressionIndexMap));
    }
    // Process the post-aggregation expression
    ExpressionContext postAggregationExpression = arguments.get(numArguments - 1);
    Preconditions.checkArgument(postAggregationExpression.getType() == ExpressionContext.Type.LITERAL,
    "Last argument of DISTINCT_COUNT_THETA_SKETCH aggregation function must be literal (post-aggregation "
    + "expression)");
    Expression expr = CalciteSqlParser.compileToExpression(postAggregationExpression.getLiteral().getStringValue());
    _postAggregationExpression = RequestContextUtils.getExpression(expr);
    // Validate the post-aggregation expression
    _includeDefaultSketch = validatePostAggregationExpression(_postAggregationExpression, _filterEvaluators.size());
    }
    }
  • This patch fixes the Calcite operand type checker for the DISTINCTCOUNTTHETASKETCH and DISTINCTCOUNTRAWTHETASKETCH aggregation functions.

@yashmayya yashmayya added bugfix multi-stage Related to the multi-stage query engine labels Oct 23, 2024
@codecov-commenter
Copy link

codecov-commenter commented Oct 23, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 63.80%. Comparing base (59551e4) to head (34158ac).
Report is 1225 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14285      +/-   ##
============================================
+ Coverage     61.75%   63.80%   +2.05%     
- Complexity      207     1535    +1328     
============================================
  Files          2436     2629     +193     
  Lines        133233   145161   +11928     
  Branches      20636    22205    +1569     
============================================
+ Hits          82274    92616   +10342     
- Misses        44911    45717     +806     
- Partials       6048     6828     +780     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.78% <100.00%> (+2.07%) ⬆️
java-21 63.66% <100.00%> (+2.04%) ⬆️
skip-bytebuffers-false 63.79% <100.00%> (+2.05%) ⬆️
skip-bytebuffers-true 63.64% <100.00%> (+35.91%) ⬆️
temurin 63.80% <100.00%> (+2.05%) ⬆️
unittests 63.79% <100.00%> (+2.05%) ⬆️
unittests1 55.40% <100.00%> (+8.51%) ⬆️
unittests2 34.36% <0.00%> (+6.63%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@yashmayya yashmayya force-pushed the fix-distinctcountthetasketch-in-v2 branch from 14ba12f to 34158ac Compare October 23, 2024 17:19
@Jackie-Jiang Jackie-Jiang merged commit a7b6dd7 into apache:master Oct 23, 2024
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bugfix multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants