Skip to content

Commit

Permalink
Use -Array variants of aggregates in schema_array_transformer (v2) (
Browse files Browse the repository at this point in the history
#1226)

Review fixes for: #1152

`schema_array_transformer` transforms the SQL query for `Array` columns.
Before this change, if an aggregation was performed on a `Array` column,
e.g. `sum(myArrayColumn)`, the transformer would change it into
`sum(arrayJoin(myArrayColumn))`.

However using `arrayJoin` function has problems - `arrayJoin` modifies
the result set of SQL query introducing additional rows. If there are
many `arrayJoin`s, a Cartesian product many rows will be performed: this
causes query slowdown and makes the result invalid (we don't actually
want to do a Cartesian product!).

Solve the problem by using `-Array` variants of aggregates (e.g.
`sumArray` instead of `sum(arrayJoin())`), which does not inflate the
number of result rows.

Note that this PR does NOT get rid of `arrayJoin()` fully in all cases.
There are panels that actually need it, such as "Top products this week"
in eCommerce dashboard, where we `GROUP BY` an array column.

<img width="1350" alt="Screenshot 2025-01-07 at 11 20 42"
src="https://github.com/user-attachments/assets/214890d5-c04a-4a6a-a683-5bffaf944d80"
/>

This remaining case should use the `ARRAY JOIN` operator, but this is
out-of-scope of this PR.

Closes #1152

---------

Co-authored-by: Piotr Grabowski <[email protected]>
  • Loading branch information
jakozaur and avelanarius authored Jan 24, 2025
1 parent 9bf027a commit 2661385
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 29 deletions.
101 changes: 73 additions & 28 deletions quesma/quesma/schema_array_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,52 @@ import (
//
//

type functionWithCombinator struct {
baseFunctionName string
isArray bool
isIf bool
isOrNull bool
isState bool
isMerge bool
}

func (f functionWithCombinator) String() string {
result := f.baseFunctionName
if f.isArray {
result = result + "Array"
}
if f.isIf {
result = result + "If"
}
if f.isOrNull {
result = result + "OrNull"
}
if f.isState {
result = result + "State"
}
if f.isMerge {
result = result + "Merge"
}
return result
}

func parseFunctionWithCombinator(funcName string) (result functionWithCombinator) {
stripSuffix := func(s string, suffix string) (string, bool) {
if strings.HasSuffix(s, suffix) {
return strings.TrimSuffix(s, suffix), true
}
return s, false
}

result.baseFunctionName = funcName
result.baseFunctionName, result.isState = stripSuffix(result.baseFunctionName, "State")
result.baseFunctionName, result.isMerge = stripSuffix(result.baseFunctionName, "Merge")
result.baseFunctionName, result.isIf = stripSuffix(result.baseFunctionName, "If")
result.baseFunctionName, result.isOrNull = stripSuffix(result.baseFunctionName, "OrNull")

return result
}

type arrayTypeResolver struct {
indexSchema schema.Schema
}
Expand Down Expand Up @@ -73,6 +119,7 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) model.ExprVisitor {

}

var childGotArrayFunc bool
visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} {

if len(e.Args) > 0 {
Expand All @@ -81,23 +128,38 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) model.ExprVisitor {
if ok {
dbType := resolver.dbColumnType(column.ColumnName)
if strings.HasPrefix(dbType, "Array") {
if strings.HasPrefix(e.Name, "sum") {
// here we apply -Array combinator to the sum function
// https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-array
//
// TODO this can be rewritten to transform all aggregate functions as well
//
e.Name = strings.ReplaceAll(e.Name, "sum", "sumArray")
} else {
logger.Error().Msgf("Unhandled array function %s, column %v (%v)", e.Name, column.ColumnName, dbType)
}
funcParsed := parseFunctionWithCombinator(e.Name)
funcParsed.isArray = true
childGotArrayFunc = true
e.Name = funcParsed.String()
}
} else {
e.Args = b.VisitChildren(e.Args)
}
}

return model.NewFunction(e.Name, e.Args...)
}

visitor.OverrideVisitWindowFunction = func(b *model.BaseExprVisitor, e model.WindowFunction) interface{} {
childGotArrayFunc = false
args := b.VisitChildren(e.Args)
return model.NewFunction(e.Name, args...)
if childGotArrayFunc {
funcParsed := parseFunctionWithCombinator(e.Name)
funcParsed.isArray = true
e.Name = funcParsed.String()
}
return model.NewWindowFunction(e.Name, args, e.PartitionBy, e.OrderBy)
}

visitor.OverrideVisitColumnRef = func(b *model.BaseExprVisitor, e model.ColumnRef) interface{} {
dbType := resolver.dbColumnType(e.ColumnName)
if strings.HasPrefix(dbType, "Array") {
logger.Error().Msgf("Unhandled array column ref %v (%v)", e.ColumnName, dbType)
}
return e
}

return visitor
}

Expand Down Expand Up @@ -148,23 +210,6 @@ func checkIfGroupingByArrayColumn(selectCommand model.SelectCommand, resolver ar
return &e
}

visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} {

if strings.HasPrefix(e.Name, "sum") || strings.HasPrefix(e.Name, "count") {

if len(e.Args) > 0 {
arg := e.Args[0]

if isArrayColumn(arg) {
found = true
}

}

}
return e
}

selectCommand.Accept(visitor)

return found
Expand Down
4 changes: 4 additions & 0 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ func columnsToAliasedColumns(columns []model.Expr) []model.Expr {
aliasedColumns[i] = model.NewAliasedExpr(column, fmt.Sprintf("column_%d", i))
continue
}
if _, ok := column.(model.WindowFunction); ok {
aliasedColumns[i] = model.NewAliasedExpr(column, fmt.Sprintf("column_%d", i))
continue
}

aliasedColumns[i] = model.NewAliasedExpr(column, fmt.Sprintf("column_%d", i))
logger.Error().Msgf("Quesma internal error - unreachable code: unsupported column type %T", column)
Expand Down
32 changes: 31 additions & 1 deletion quesma/quesma/schema_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,37 @@ func Test_arrayType(t *testing.T) {
FromClause: model.NewTableRef("kibana_sample_data_ecommerce"),
Columns: []model.Expr{
model.NewColumnRef("order_date"),
model.NewAliasedExpr(model.NewFunction("sumOrNull", model.NewFunction("arrayJoin", model.NewColumnRef("products_quantity"))), "column_1"),
model.NewAliasedExpr(model.NewFunction("sumArrayOrNull", model.NewColumnRef("products_quantity")), "column_1"),
},
GroupBy: []model.Expr{model.NewColumnRef("order_date")},
},
},
},

{
name: "arrayReducePancake",
//SELECT "order_date", avgOrNullMerge(avgOrNullState("products::quantity"")) OVER (), sumOrNull("products::quantity") FROM "kibana_sample_data_ecommerce" GROUP BY "order_date"
query: &model.Query{
TableName: "kibana_sample_data_ecommerce",
SelectCommand: model.SelectCommand{
FromClause: model.NewTableRef("kibana_sample_data_ecommerce"),
Columns: []model.Expr{
model.NewColumnRef("order_date"),
model.NewWindowFunction("avgOrNullMerge", []model.Expr{model.NewFunction("avgOrNullState", model.NewColumnRef("products.quantity"))}, []model.Expr{}, []model.OrderByExpr{}),
model.NewFunction("sumOrNull", model.NewColumnRef("products.quantity")),
},
GroupBy: []model.Expr{model.NewColumnRef("order_date")},
},
},
//SELECT "order_date", avgArrayOrNullMerge(avgArrayOrNullMerge("products::quantity"")) OVER (), sumOrNull("products::quantity") FROM "kibana_sample_data_ecommerce" GROUP BY "order_date"
expected: &model.Query{
TableName: "kibana_sample_data_ecommerce",
SelectCommand: model.SelectCommand{
FromClause: model.NewTableRef("kibana_sample_data_ecommerce"),
Columns: []model.Expr{
model.NewColumnRef("order_date"),
model.NewAliasedExpr(model.NewWindowFunction("avgArrayOrNullMerge", []model.Expr{model.NewFunction("avgArrayOrNullState", model.NewColumnRef("products_quantity"))}, []model.Expr{}, []model.OrderByExpr{}), "column_1"),
model.NewAliasedExpr(model.NewFunction("sumArrayOrNull", model.NewColumnRef("products_quantity")), "column_2"),
},
GroupBy: []model.Expr{model.NewColumnRef("order_date")},
},
Expand Down

0 comments on commit 2661385

Please sign in to comment.