Skip to content

Commit

Permalink
1 simple painless script in terms aggregation (#1196)
Browse files Browse the repository at this point in the history
First PR fine-tuning Quesma for the new customer
  • Loading branch information
trzysiek authored Jan 26, 2025
1 parent c21d75d commit b0d8343
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 13 deletions.
29 changes: 21 additions & 8 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (query Terms) UpdateFieldForIncludeAndExclude(field model.Expr) (updatedFie
// TODO add bad requests to tests
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
func CheckParamsTerms(ctx context.Context, paramsRaw any) error {
requiredParams := map[string]string{"field": "string"}
eitherRequired := map[string]string{"field": "string", "script": "map"}
optionalParams := map[string]string{
"size": "float64|string", // TODO should be int|string, will be fixed
"shard_size": "float64", // TODO should be int, will be fixed
Expand All @@ -197,19 +197,32 @@ func CheckParamsTerms(ctx context.Context, paramsRaw any) error {
}

// check if required are present
for paramName, paramType := range requiredParams {
paramVal, exists := params[paramName]
if !exists {
return fmt.Errorf("required parameter %s not found in Terms params", paramName)
nrOfRequired := 0
for paramName := range eitherRequired {
if _, exists := params[paramName]; exists {
nrOfRequired++
}
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
}
if nrOfRequired != 1 {
return fmt.Errorf("expected exactly one of %v in Terms params %v", eitherRequired, params)
}
if field, exists := params["field"]; exists {
if _, isString := field.(string); !isString {
return fmt.Errorf("field is not a string, but %T", field)
}
} else {
_, hasInclude := params["include"]
_, hasExclude := params["exclude"]
_, hasMissing := params["missing"]
if hasInclude || hasExclude || hasMissing {
return fmt.Errorf("field is missing, but include/exclude/missing are present in Terms params %v", params)
}
// TODO check script's type as well
}

// check if only required/optional are present
for paramName := range params {
if _, isRequired := requiredParams[paramName]; !isRequired {
if _, isRequired := eitherRequired[paramName]; !isRequired {
wantedType, isOptional := optionalParams[paramName]
if !isOptional {
return fmt.Errorf("unexpected parameter %s found in Terms params %v", paramName, params)
Expand Down
12 changes: 10 additions & 2 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any,
// maybe "field" field
if fieldRaw, ok := Map["field"]; ok {
if field, ok := fieldRaw.(string); ok {
return model.NewColumnRef(ResolveField(cw.Ctx, field, cw.Schema)), true // remove this resolve? we do all transforms after parsing is done?
return model.NewColumnRef(ResolveField(cw.Ctx, field, cw.Schema)), false // remove this resolve? we do all transforms after parsing is done?
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v", fieldRaw, fieldRaw)
}
Expand Down Expand Up @@ -366,12 +366,20 @@ func (cw *ClickhouseQueryTranslator) parseFieldFromScriptField(queryMap QueryMap
logger.WarnWithCtx(cw.Ctx).Msgf("source is not a string, but %T, value: %v", sourceRaw, sourceRaw)
}

// source must look like "doc['field_name'].value.getHour()" or "doc['field_name'].value.hourOfDay"
// a) source must look like "doc['field_name'].value.getHour()" or "doc['field_name'].value.hourOfDay"
wantedRegex := regexp.MustCompile(`^doc\['(\w+)']\.value\.(?:getHour\(\)|hourOfDay)$`)
matches := wantedRegex.FindStringSubmatch(source)
if len(matches) == 2 {
return model.NewFunction("toHour", model.NewColumnRef(matches[1])), true
}

// b) source: "if (doc['field_name_1'].value == doc['field_name_2'].value") { return 1; } else { return 0; }"
wantedRegex = regexp.MustCompile(`^if \(doc\['(.*)\.value']\.value == doc\['(.*)\.value'].value\) \{ \n return 1; \n} else \{ \n return 0; \n}$`)
matches = wantedRegex.FindStringSubmatch(source)
if len(matches) == 3 {
return model.NewInfixExpr(model.NewColumnRef(matches[1]), "=", model.NewColumnRef(matches[2])), true
}

return
}

Expand Down
11 changes: 8 additions & 3 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,14 @@ func (cw *ClickhouseQueryTranslator) parseTermsAggregation(aggregation *pancakeA
)

var didWeAddMissing, didWeUpdateFieldHere bool
field := cw.parseFieldField(params, aggrName)
field, didWeAddMissing = cw.addMissingParameterIfPresent(field, params)
field, didWeUpdateFieldHere = terms.UpdateFieldForIncludeAndExclude(field)
field, isFromScript := cw.parseFieldFieldMaybeScript(params, aggrName)
if !isFromScript {
// We currently don't support both 'script' and any of ['include', 'exclude', 'missing'] at the same time
// as it's not completely obvious how to handle it. Let's wait for a use case.
// (we'll see it in logs if it happens, because of CheckParamsTerms above)
field, didWeAddMissing = cw.addMissingParameterIfPresent(field, params)
field, didWeUpdateFieldHere = terms.UpdateFieldForIncludeAndExclude(field)
}

// If we updated above, we change our select to if(condition, field, NULL), so we also need to filter out those NULLs later
if !didWeAddMissing || didWeUpdateFieldHere {
Expand Down
139 changes: 139 additions & 0 deletions quesma/testdata/dates.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,4 +828,143 @@ var AggregationTestsWithDates = []AggregationTestCase{
AS "aggr__sales_per_year__key_0"
ORDER BY "aggr__sales_per_year__key_0" ASC`,
},
{ // [7]
TestName: "turing 1 - painless script in terms",
QueryRequestJson: `
{
"_source": {
"excludes": []
},
"aggs": {
"1": {
"aggs": {
"2": {
"terms": {
"order": {
"_count": "desc"
},
"script": {
"lang": "painless",
"source": "if (doc['request_id.value'].value == doc['origin_request_id.value'].value) { \n return 1; \n} else { \n return 0; \n}"
},
"shard_size": 25,
"size": 5,
"value_type": "boolean"
}
}
},
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "30d",
"min_doc_count": 1,
"time_zone": "Europe/Warsaw"
}
}
},
"script_fields": {
"is_initial_request": {
"script": {
"lang": "painless",
"source": "if (doc['request_id.value'].value == doc['origin_request_id.value'].value) { \n return 1; \n} else { \n return 0; \n}"
}
}
},
"size": 0,
"track_total_hits": true
}`,
ExpectedResponse: `
{
"aggregations": {
"1": {
"buckets": [
{
"2": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 1,
"key_as_string": "true",
"doc_count": 1635
},
{
"key": 0,
"key_as_string": "false",
"doc_count": 50
}
]
},
"key_as_string": "2024-12-12T23:00:00.000",
"key": 1734044400000,
"doc_count": 1685
},
{
"2": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": 1,
"key_as_string": "true",
"doc_count": 6844
}
]
},
"key_as_string": "2025-01-11T23:00:00.000",
"key": 1736636400000,
"doc_count": 6844
}
]
}
}
}`,
ExpectedPancakeResults: []model.QueryResultRow{
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__1__key_0", int64(1734054400000/2592000000)),
model.NewQueryResultCol("aggr__1__count", int64(1685)),
model.NewQueryResultCol("aggr__1__2__parent_count", int64(1685)),
model.NewQueryResultCol("aggr__1__2__key_0", true),
model.NewQueryResultCol("aggr__1__2__count", int64(1635)),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__1__key_0", int64(1734054400000/2592000000)),
model.NewQueryResultCol("aggr__1__count", int64(1685)),
model.NewQueryResultCol("aggr__1__2__parent_count", int64(1685)),
model.NewQueryResultCol("aggr__1__2__key_0", false),
model.NewQueryResultCol("aggr__1__2__count", int64(50)),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__1__key_0", int64(1736646400000/2592000000)),
model.NewQueryResultCol("aggr__1__count", int64(6844)),
model.NewQueryResultCol("aggr__1__2__parent_count", int64(6844)),
model.NewQueryResultCol("aggr__1__2__key_0", true),
model.NewQueryResultCol("aggr__1__2__count", int64(6844)),
}},
},
ExpectedPancakeSQL: `
SELECT "aggr__1__key_0", "aggr__1__count", "aggr__1__2__parent_count",
"aggr__1__2__key_0", "aggr__1__2__count"
FROM (
SELECT "aggr__1__key_0", "aggr__1__count", "aggr__1__2__parent_count",
"aggr__1__2__key_0", "aggr__1__2__count",
dense_rank() OVER (ORDER BY "aggr__1__key_0" ASC) AS "aggr__1__order_1_rank",
dense_rank() OVER (PARTITION BY "aggr__1__key_0" ORDER BY
"aggr__1__2__count" DESC, "aggr__1__2__key_0" ASC) AS
"aggr__1__2__order_1_rank"
FROM (
SELECT toInt64((toUnixTimestamp64Milli("@timestamp")+timeZoneOffset(
toTimezone("@timestamp", 'Europe/Warsaw'))*1000) / 2592000000) AS
"aggr__1__key_0",
sum(count(*)) OVER (PARTITION BY "aggr__1__key_0") AS "aggr__1__count",
sum(count(*)) OVER (PARTITION BY "aggr__1__key_0") AS
"aggr__1__2__parent_count",
"request_id"="origin_request_id" AS "aggr__1__2__key_0",
count(*) AS "aggr__1__2__count"
FROM __quesma_table_name
GROUP BY toInt64((toUnixTimestamp64Milli("@timestamp")+timeZoneOffset(
toTimezone("@timestamp", 'Europe/Warsaw'))*1000) / 2592000000) AS
"aggr__1__key_0", "request_id"="origin_request_id" AS "aggr__1__2__key_0"))
WHERE "aggr__1__2__order_1_rank"<=6
ORDER BY "aggr__1__order_1_rank" ASC, "aggr__1__2__order_1_rank" ASC`,
},
}

0 comments on commit b0d8343

Please sign in to comment.