Skip to content

Commit

Permalink
Merge pull request #695 from Altinity/issue-430
Browse files Browse the repository at this point in the history
add `$columnsMs` macro,
  • Loading branch information
Slach authored Dec 19, 2024
2 parents 3025cc5 + 30d4c65 commit 2bddea7
Show file tree
Hide file tree
Showing 12 changed files with 571 additions and 279 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
* Add single stat panel with categories, fix https://github.com/Altinity/clickhouse-grafana/issues/403
* Add log context windows size to connection settings, fix https://github.com/Altinity/clickhouse-grafana/issues/657
* Add `X-ClickHouse-SSL-Certificate-Auth` support, fix https://github.com/Altinity/clickhouse-grafana/issues/580
* Add `$columnsMs` macro, fix https://github.com/Altinity/clickhouse-grafana/issues/430

## Fixes:
* Add transposed table example, fix https://github.com/Altinity/clickhouse-grafana/issues/404

# 3.3.0 (2024-12-02)
## Enhancements:
Expand Down
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,43 @@ This will help to build the next graph:

![req_by_os image](https://github.com/Altinity/clickhouse-grafana/raw/master/.github/images/08_requests_by_os.png)

---
### $columnsMs(key, value) - same as $columns but for time series with ms

Example usage:

```sql
$columnsMs(OSName, count(*) c)
FROM requests
INNER JOIN oses USING (OS)
```

Query will be transformed into:

```sql
SELECT
t,
groupArray((OSName, c)) AS groupArr
FROM
(
SELECT
$timeSeriesMs AS t,
OSName,
count(*) AS c
FROM requests
INNER JOIN oses USING (OS)
WHERE ((EventDate >= toDate(1482796627)) AND (EventDate <= toDate(1482853383))) AND ((EventTime >= toDateTime64(1482796627,3)) AND (EventTime <= toDateTime64(1482853383,3)))
GROUP BY
t,
OSName
ORDER BY
t,
OSName
)
GROUP BY t
ORDER BY t
```

---

### $rateColumns(key, value) - is a combination of $columns and $rate
Expand Down
68 changes: 50 additions & 18 deletions pkg/eval_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ func (q *EvalQuery) applyMacros(query string, ast *EvalAST) (string, error) {
if q.contain(ast, "$columns") {
return q.columns(query, ast)
}
if q.contain(ast, "$columnsMs") {
return q.columnsMs(query, ast)
}
if q.contain(ast, "$rateColumnsAggregated") {
return q.rateColumnsAggregated(query, ast)
}
Expand Down Expand Up @@ -374,10 +377,26 @@ func (q *EvalQuery) columns(query string, ast *EvalAST) (string, error) {
if args == nil || len(args) != 2 {
return "", fmt.Errorf("amount of arguments must equal 2 for $columns func. Parsed arguments are: %v", ast.Obj["$columns"])
}
return q._columns(args[0].(string), args[1].(string), beforeMacrosQuery, fromQuery)
return q._columns(args[0].(string), args[1].(string), beforeMacrosQuery, fromQuery, false)
}

func (q *EvalQuery) columnsMs(query string, ast *EvalAST) (string, error) {
macroQueries, err := q._parseMacro("$columnsMs", query)
if err != nil {
return "", err
}
beforeMacrosQuery, fromQuery := macroQueries[0], macroQueries[1]
if len(fromQuery) < 1 {
return query, nil
}
args := ast.Obj["$columnsMs"].(*EvalAST).Arr
if args == nil || len(args) != 2 {
return "", fmt.Errorf("amount of arguments must equal 2 for $columnsMs func. Parsed arguments are: %v", ast.Obj["$columnsMs"])
}
return q._columns(args[0].(string), args[1].(string), beforeMacrosQuery, fromQuery, true)
}

func (q *EvalQuery) _columns(key, value, beforeMacrosQuery, fromQuery string) (string, error) {
func (q *EvalQuery) _columns(key, value, beforeMacrosQuery, fromQuery string, useMs bool) (string, error) {
if key[len(key)-1] == ')' || value[len(value)-1] == ')' {
return "", fmt.Errorf("some of passed arguments are without aliases: %s, %s", key, value)
}
Expand Down Expand Up @@ -418,13 +437,16 @@ func (q *EvalQuery) _columns(key, value, beforeMacrosQuery, fromQuery string) (s
fromQuery = fromQuery[0 : groupByIndex-1]
}
}
fromQuery = q._applyTimeFilter(fromQuery)

fromQuery = q._applyTimeFilter(fromQuery, useMs)
timeSeriesMacro := "$timeSeries"
if useMs {
timeSeriesMacro = "$timeSeriesMs"
}
return beforeMacrosQuery + "SELECT" +
" t," +
" groupArray((" + keyAlias + ", " + valueAlias + ")) AS groupArr" +
" FROM (" +
" SELECT $timeSeries AS t" +
" SELECT " + timeSeriesMacro + " AS t" +
", " + key +
", " + value + " " +
fromQuery +
Expand Down Expand Up @@ -478,7 +500,7 @@ func (q *EvalQuery) rateColumns(query string, ast *EvalAST) (string, error) {
return "", fmt.Errorf("amount of arguments must equal 2 for $rateColumns func. Parsed arguments are: %v", args)
}

query, err = q._columns(args[0].(string), args[1].(string), "", fromQuery)
query, err = q._columns(args[0].(string), args[1].(string), "", fromQuery, false)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -519,7 +541,7 @@ func (q *EvalQuery) _prepareColumnsAggregated(macroName string, query string, as
having = " " + fromQuery[havingIndex:]
fromQuery = fromQuery[0 : havingIndex-1]
}
fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)

var key = args[0].(string)
var keySplit = strings.Split(strings.Trim(key, " \xA0\t\r\n"), " ")
Expand Down Expand Up @@ -714,7 +736,7 @@ func (q *EvalQuery) _rate(args []interface{}, beforeMacrosQuery, fromQuery strin
}
}

fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)
return beforeMacrosQuery + "SELECT " +
"t," +
" " + strings.Join(cols, ", ") +
Expand Down Expand Up @@ -757,7 +779,7 @@ func (q *EvalQuery) perSecondColumns(query string, ast *EvalAST) (string, error)
having = " " + fromQuery[havingIndex:]
fromQuery = fromQuery[0 : havingIndex-1]
}
fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)
var maxPerSecond string
if q.UseWindowFuncForMacros {
maxPerSecond = "if((max_0 - lagInFrame(max_0,1,0) OVER ()) < 0 OR lagInFrame(" + alias + ",1," + alias + ") OVER () != " + alias +
Expand Down Expand Up @@ -816,7 +838,7 @@ func (q *EvalQuery) deltaColumns(query string, ast *EvalAST) (string, error) {
having = " " + fromQuery[havingIndex:]
fromQuery = fromQuery[0 : havingIndex-1]
}
fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)

var maxDelta string
if q.UseWindowFuncForMacros {
Expand Down Expand Up @@ -876,7 +898,7 @@ func (q *EvalQuery) increaseColumns(query string, ast *EvalAST) (string, error)
having = " " + fromQuery[havingIndex:]
fromQuery = fromQuery[0 : havingIndex-1]
}
fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)
var maxIncrease string
if q.UseWindowFuncForMacros {
maxIncrease = "if((max_0 - lagInFrame(max_0,1,0) OVER ()) < 0 OR lagInFrame(" + alias + ",1," + alias + ") OVER () != " + alias + ", 0, max_0 - lagInFrame(max_0,1,0) OVER ())"
Expand Down Expand Up @@ -940,7 +962,7 @@ func (q *EvalQuery) _perSecond(args []interface{}, beforeMacrosQuery, fromQuery
}
}

fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)
return beforeMacrosQuery + "SELECT " +
"t," +
" " + strings.Join(cols, ", ") +
Expand Down Expand Up @@ -985,7 +1007,7 @@ func (q *EvalQuery) _delta(args []interface{}, beforeMacrosQuery, fromQuery stri
}
}

fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)
return beforeMacrosQuery + "SELECT " +
"t," +
" " + strings.Join(cols, ", ") +
Expand Down Expand Up @@ -1030,7 +1052,7 @@ func (q *EvalQuery) _increase(args []interface{}, beforeMacrosQuery, fromQuery s
}
}

fromQuery = q._applyTimeFilter(fromQuery)
fromQuery = q._applyTimeFilter(fromQuery, false)
return beforeMacrosQuery + "SELECT " +
"t," +
" " + strings.Join(cols, ", ") +
Expand All @@ -1043,12 +1065,17 @@ func (q *EvalQuery) _increase(args []interface{}, beforeMacrosQuery, fromQuery s
")", nil
}

func (q *EvalQuery) _applyTimeFilter(query string) string {
func (q *EvalQuery) _applyTimeFilter(query string, useMs bool) string {
timeFilterMacro := "$timeFilter"
if useMs {
timeFilterMacro = "$timeFilterMs"
}
if strings.Contains(strings.ToLower(query), "where") {
whereRe := regexp.MustCompile("(?i)where")
query = whereRe.ReplaceAllString(query, "WHERE $$timeFilter AND")
//don't delete $ it needs for replacing with regexp
query = whereRe.ReplaceAllString(query, "WHERE $"+timeFilterMacro+" AND")
} else {
query += " WHERE $timeFilter"
query += " WHERE " + timeFilterMacro
}

return query
Expand Down Expand Up @@ -1801,7 +1828,7 @@ const joinsRe = "\\b(" +
")\\b"
const onJoinTokenRe = "\\b(using|on)\\b"
const tableNameRe = `([A-Za-z0-9_]+|[A-Za-z0-9_]+\\.[A-Za-z0-9_]+)`
const macroFuncRe = "(\\$deltaColumnsAggregated|\\$increaseColumnsAggregated|\\$perSecondColumnsAggregated|\\$rateColumnsAggregated|\\$rateColumns|\\$perSecondColumns|\\$deltaColumns|\\$increaseColumns|\\$rate|\\$perSecond|\\$delta|\\$increase|\\$columns)"
const macroFuncRe = "(\\$deltaColumnsAggregated|\\$increaseColumnsAggregated|\\$perSecondColumnsAggregated|\\$rateColumnsAggregated|\\$rateColumns|\\$perSecondColumns|\\$deltaColumns|\\$increaseColumns|\\$rate|\\$perSecond|\\$delta|\\$increase|\\$columnsMs|\\$columns)"
const condRe = "\\b(or|and)\\b"
const inRe = "\\b(global in|global not in|not in|in)\\b(?:\\s+\\[\\s*(?:'[^']*'\\s*,\\s*)*'[^']*'\\s*\\])?"
const closureRe = "[\\(\\)\\[\\]]"
Expand Down Expand Up @@ -2098,6 +2125,11 @@ func printAST(AST *EvalAST, tab string) string {
result += printItems(AST.Obj["$columns"].(*EvalAST), tab, ",") + ")"
}

if AST.hasOwnProperty("$columnsMs") {
result += tab + "$columnsMs("
result += printItems(AST.Obj["$columnsMs"].(*EvalAST), tab, ",") + ")"
}

if AST.hasOwnProperty("$rateColumns") {
result += tab + "$rateColumns("
result += printItems(AST.Obj["$rateColumns"].(*EvalAST), tab, ",") + ")"
Expand Down
Loading

0 comments on commit 2bddea7

Please sign in to comment.