diff --git a/docs/clickhouse.md b/docs/clickhouse.md index cc72973a1..d5651e101 100644 --- a/docs/clickhouse.md +++ b/docs/clickhouse.md @@ -1,22 +1,59 @@ # ClickHouse Support -Trickster 1.0 provides support for accelerating ClickHouse queries that return time series data normally visualized on a dashboard. Acceleration works by using the Time Series Delta Proxy Cache to minimize the number and time range of queries to the upstream ClickHouse server. +Trickster 1.1 provides expanded support for accelerating ClickHouse queries that return time series data normally visualized on a dashboard. Acceleration works by using the Time Series Delta Proxy Cache to minimize the number and time range of queries to the upstream ClickHouse server. ## Scope of Support -Trickster is tested with the [ClickHouse DataSource Plugin for Grafana](https://grafana.com/grafana/plugins/vertamedia-clickhouse-datasource) v1.9.3 by Vertamedia, and supports acceleration of queries constructed by this plugin using the plugin's built-in `$timeSeries` macro. +Trickster is tested with the [ClickHouse DataSource Plugin for Grafana](https://grafana.com/grafana/plugins/vertamedia-clickhouse-datasource) v1.9.3 by Vertamedia, and supports acceleration of queries constructed by this plugin using the plugin's built-in `$timeSeries` macro. Trickster also supports several other query formats that return "time series like" data. -Because ClickHouse does not provide a golang-based query parser, Trickster uses pre-compiled Regular Expression pattern matches on the incoming ClickHouse query to deconstruct its components, determine if it is cacheable and, if so, what elements are factored into the cache key derivation. We also determine what parts of the query are template-able (e.g., `time BETWEEN $time1 AND $time2`) based on the provided absolute values, in order to normalize the query before hashing the cache key. +Because ClickHouse does not provide a golang-based query parser, Trickster uses custom parsing code on the incoming ClickHouse query to deconstruct its components, determine if it is cacheable and, if so, what elements are factored into the cache key derivation. Trickster also determines the requested time range and step based on the provided absolute values, in order to normalize the query before hashing the cache key. -If you find query or response structures that are not yet supported, or providing inconsistent or unexpected results, we'd love for you to report those. We also always welcome any contributions around this functionality. The regular expression patterns we currently use will likely grow in complexity as support for more query patterns is added. Thus, we may need to find a more robust query parsing solution, and welcome any assistance with that as well. +If you find query or response structures that are not yet supported, or providing inconsistent or unexpected results, we'd love for you to report those. We also always welcome any contributions around this functionality. -Trickster currently supports the following query patterns (case-insensitive) in the JSON response format, which align with the output of the ClickHouse Data Source Plugin for Grafana: +To constitute a cacheable query, the first column expression in the main or any subquery must be in one of two specific forms in order to determine the timestamp column and step: +#### Grafana Plugin Format ```sql -SELECT (intDiv(toUInt32(time_col), 60) * 60) * 1000 AS t, countMerge(val_col) AS cnt, field1, field2 -FROM exampledb.example_table WHERE time_col BETWEEN toDateTime(1574686300) AND toDateTime(1574689900) - AND field1 > 0 AND field2 = 'some_value' GROUP BY t, field1, field2 ORDER BY t, field1, field2 -FORMAT JSON +SELECT intDiv(toUInt32(time_col, 60) * 60) [* 1000] [as] [alias] ``` +This is the approach used by the Grafana plugin. The time_col and/or alias is used to determine the requested time range from the WHERE or PREWHERE clause of the query. The argument to the ClickHouse intDiv function is the step value in seconds, since the toUInt32 function on a datetime column returns the Unix epoch seconds. -In this format, the first column must be the datapoint's timestamp, the second column must be the datapoint's value, and all additional fields define the datapoint's metric name. The time column must be in the format of `(intDiv(toUInt32($time_col), $period) * $period) * 1000`, and the value column must be numeric (integer or floating point). The where clause must include `time_col > toDateTime($epoch)` or `time_col BETWEEN toDateTime($epoch1) AND toDateTime($epoch2)`. Subqueries and other modifications are compatible so long as the key components of the time series, mentioned here, can be extracted. +#### ClickHouse Time Grouping Function +```sql +SELECT toStartOf[Period](time_col) [as] [alias] +``` +This is the approach that uses the following optimized ClickHouse functions to group timeseries queries: +``` +toStartOfMinute +toStartOfFiveMinute +toStartOfTenMinutes +toStartOfFifteenMinutes +toStartOfHour +toDate +``` +Again the time_col and/or alias is used to determine the request time range from the WHERE or PREWHERE clause, and the step is derived from the function name. + +#### Determining the requested time range + +Once the time column (or alias) and step are derived, Trickster parses each WHERE or PREWHERE clause to find comparison operations +that mark the requested time range. To be cacheable, the WHERE clause must contain either a `[timecol|alias] BETWEEN` phrase or +a `[time_col|alias] >[=]` phrase. The BETWEEN or >= arguments must be a parseable ClickHouse string date in the form `2006-01-02 15:04:05`, a +a ten digit integer representing epoch seconds, or the `now()` ClickHouse function with optional subtraction. + +If a `>` phrase is used, a similar `<` phrase can be used to specify the end of the time period. If none is found, Trickster will still cache results up to +the current time, but future queries must also have no end time phrase, or Trickster will be unable to find the correct cache key. + +Examples of cacheable time range WHERE clauses: +```sql +WHERE t >= "2020-10-15 00:00:00" and t <= "2020-10-16 12:00:00" +WHERE t >= "2020-10-15 12:00:00" and t < now() - 60 * 60 +WHERE datetime BETWEEN 1574686300 AND 1574689900 +``` + +Note that these values can be wrapped in the ClickHouse toDateTime function, but ClickHouse will make that conversion implicitly and it is not required. All string times are assumed to be UTC. + +### Normalization and "Fast Forwarding" + +Trickster will always normalize the calculated time range to fit the step size, so small variations in the time range will still result in actual queries for +the entire time "bucket". In addition, Trickster will not cache the results for the portion of the query that is still active -- i.e., within the current bucket +or within the configured backfill tolerance setting (whichever is greater) diff --git a/pkg/proxy/origins/clickhouse/clickhouse.go b/pkg/proxy/origins/clickhouse/clickhouse.go index d46818a30..685cd6acf 100644 --- a/pkg/proxy/origins/clickhouse/clickhouse.go +++ b/pkg/proxy/origins/clickhouse/clickhouse.go @@ -18,18 +18,18 @@ package clickhouse import ( + "github.com/tricksterproxy/trickster/pkg/proxy/request" "net/http" "net/url" + "time" "github.com/tricksterproxy/trickster/pkg/cache" "github.com/tricksterproxy/trickster/pkg/proxy" "github.com/tricksterproxy/trickster/pkg/proxy/errors" "github.com/tricksterproxy/trickster/pkg/proxy/origins" oo "github.com/tricksterproxy/trickster/pkg/proxy/origins/options" - tt "github.com/tricksterproxy/trickster/pkg/proxy/timeconv" "github.com/tricksterproxy/trickster/pkg/proxy/urls" "github.com/tricksterproxy/trickster/pkg/timeseries" - "github.com/tricksterproxy/trickster/pkg/util/regexp/matching" ) var _ origins.Client = (*Client)(nil) @@ -92,39 +92,34 @@ func (c *Client) Router() http.Handler { // ParseTimeRangeQuery parses the key parts of a TimeRangeQuery from the inbound HTTP Request func (c *Client) ParseTimeRangeQuery(r *http.Request) (*timeseries.TimeRangeQuery, error) { - - trq := ×eries.TimeRangeQuery{Extent: timeseries.Extent{}} - trq.TemplateURL = urls.Clone(r.URL) - qi := trq.TemplateURL.Query() + qi := r.URL.Query() + var rawQuery string if p, ok := qi[upQuery]; ok { - trq.Statement = p[0] + rawQuery = p[0] } else { return nil, errors.MissingURLParam(upQuery) } - mp := []string{"step", "timeField"} - found := matching.GetNamedMatches(reTimeFieldAndStep, trq.Statement, mp) - - for _, f := range mp { - v, ok := found[f] - if !ok || v == "" { - return nil, errors.ErrNotTimeRangeQuery - } - switch f { - case "timeField": - trq.TimestampFieldName = v - case "step": - trq.Step, _ = tt.ParseDuration(v + "s") - } + var bf time.Duration + res := request.GetResources(r) + if res == nil { + bf = 60 * time.Second + } else { + bf = res.OriginConfig.BackfillTolerance } - var err error - trq.Statement, trq.Extent, _, err = getQueryParts(trq.Statement, trq.TimestampFieldName) - if err != nil { + // Force gzip compression since Brotli is broken on CH 20.3 + // See https://github.com/ClickHouse/ClickHouse/issues/9969 + // Clients that don't understand gzip are going to break, but oh well + r.Header.Set("Accept-Encoding", "gzip") + + trq := ×eries.TimeRangeQuery{Extent: timeseries.Extent{}, BackfillTolerance: bf} + if err := parseRawQuery(rawQuery, trq); err != nil { return nil, err } - // Swap in the Tokenzed Query in the Url Params + trq.TemplateURL = urls.Clone(r.URL) + // Swap in the Tokenized Query in the Url Params qi.Set(upQuery, trq.Statement) trq.TemplateURL.RawQuery = qi.Encode() return trq, nil diff --git a/pkg/proxy/origins/clickhouse/clickhouse_test.go b/pkg/proxy/origins/clickhouse/clickhouse_test.go index 87bb77493..0436f8393 100644 --- a/pkg/proxy/origins/clickhouse/clickhouse_test.go +++ b/pkg/proxy/origins/clickhouse/clickhouse_test.go @@ -157,17 +157,17 @@ func TestParseTimeRangeQuery(t *testing.T) { Host: "blah.com", Path: "/", RawQuery: testRawQuery(), - }} + }, + Header: http.Header{}, + } client := &Client{} res, err := client.ParseTimeRangeQuery(req) if err != nil { t.Error(err) } else { - if res.Step.Seconds() != 60 { t.Errorf("expected 60 got %f", res.Step.Seconds()) } - if res.Extent.End.Sub(res.Extent.Start).Hours() != 6 { t.Errorf("expected 6 got %f", res.Extent.End.Sub(res.Extent.Start).Hours()) } diff --git a/pkg/proxy/origins/clickhouse/handler_proxy.go b/pkg/proxy/origins/clickhouse/handler_proxy.go index 49318bb6e..41d6f0289 100644 --- a/pkg/proxy/origins/clickhouse/handler_proxy.go +++ b/pkg/proxy/origins/clickhouse/handler_proxy.go @@ -24,7 +24,7 @@ import ( ) // ProxyHandler sends a request through the basic reverse proxy to the origin, -// and services non-cacheable InfluxDB API calls +// and services non-cacheable ClickHouse API calls func (c *Client) ProxyHandler(w http.ResponseWriter, r *http.Request) { r.URL = urls.BuildUpstreamURL(r, c.baseUpstreamURL) engines.DoProxy(w, r, true) diff --git a/pkg/proxy/origins/clickhouse/handler_query.go b/pkg/proxy/origins/clickhouse/handler_query.go index 39bd251e9..4f314850d 100644 --- a/pkg/proxy/origins/clickhouse/handler_query.go +++ b/pkg/proxy/origins/clickhouse/handler_query.go @@ -28,9 +28,7 @@ import ( func (c *Client) QueryHandler(w http.ResponseWriter, r *http.Request) { rqlc := strings.Replace(strings.ToLower(r.URL.RawQuery), "%20", "+", -1) - // if it's not a select statement, just proxy it instead - if (!strings.HasPrefix(rqlc, "query=select+")) && (!(strings.Index(rqlc, "&query=select+") > 0)) && - (!strings.HasSuffix(rqlc, "format+json")) { + if (!strings.HasPrefix(rqlc, "query=")) && (!(strings.Index(rqlc, "&query=") > 0)) || r.Method != http.MethodGet { c.ProxyHandler(w, r) return } diff --git a/pkg/proxy/origins/clickhouse/handler_query_test.go b/pkg/proxy/origins/clickhouse/handler_query_test.go index f473c02f3..faaf8bb6d 100644 --- a/pkg/proxy/origins/clickhouse/handler_query_test.go +++ b/pkg/proxy/origins/clickhouse/handler_query_test.go @@ -37,8 +37,7 @@ func testRawQuery() string { } func testNonSelectQuery() string { - return url.Values(map[string][]string{"query": { - `UPDATE (intDiv(toUInt32(time_column), 60) * 60) * 1000 AS t`}}).Encode() + return url.Values(map[string][]string{"enable_http_compression": {"1"}}).Encode() // not a real query, just something to trigger a non-select proxy-only request } diff --git a/pkg/proxy/origins/clickhouse/model.go b/pkg/proxy/origins/clickhouse/model.go index 658adfc02..1614ef2ab 100644 --- a/pkg/proxy/origins/clickhouse/model.go +++ b/pkg/proxy/origins/clickhouse/model.go @@ -17,10 +17,8 @@ package clickhouse import ( - "bytes" "encoding/json" "fmt" - "sort" "strconv" "strings" "time" @@ -34,312 +32,191 @@ const ( nanosPerMillisecond = int64(time.Millisecond / time.Nanosecond) ) -// ErrNotEnoughFields returns an error when there are too few number of fields -func ErrNotEnoughFields(count int) error { - return fmt.Errorf("must have at least two fields; only have %d", count) -} - -func msToTime(ms string) (time.Time, error) { - msInt, err := strconv.ParseInt(ms, 10, 64) - if err != nil { - return time.Time{}, err - } - return time.Unix(msInt/millisPerSecond, - (msInt%millisPerSecond)*nanosPerMillisecond), nil -} - -// FieldDefinition ... type FieldDefinition struct { Name string `json:"name"` Type string `json:"type"` } -// ResponseValue ... type ResponseValue map[string]interface{} -// DataSet ... -type DataSet struct { - Metric map[string]interface{} - Points []Point -} - -// Points ... -type Points []Point - -// Point ... type Point struct { Timestamp time.Time - Value float64 + Values []ResponseValue } -// Response is the JSON responose document structure for ClickHouse query results +// Response is the JSON response document structure for ClickHouse query results type Response struct { - Meta []FieldDefinition `json:"meta"` - RawData []ResponseValue `json:"data"` - Rows int `json:"rows"` - Order []string `json:"-"` - StepDuration time.Duration `json:"step,omitempty"` - ExtentList timeseries.ExtentList `json:"extents,omitempty"` + Meta []FieldDefinition `json:"meta"` + RawData []ResponseValue `json:"data"` + Rows int `json:"rows"` } // ResultsEnvelope is the ClickHouse document structure optimized for time series manipulation type ResultsEnvelope struct { - Meta []FieldDefinition `json:"meta"` - Data map[string]*DataSet `json:"data"` - StepDuration time.Duration `json:"step,omitempty"` - ExtentList timeseries.ExtentList `json:"extents,omitempty"` - Serializers map[string]func(interface{}) `json:"-"` - SeriesOrder []string `json:"series_order,omitempty"` + Meta []FieldDefinition `json:"meta"` + Data []Point `json:"data"` + StepDuration time.Duration `json:"step,omitempty"` + ExtentList timeseries.ExtentList `json:"extents,omitempty"` timestamps map[time.Time]bool // tracks unique timestamps in the matrix data - tslist times.Times - isSorted bool // tracks if the matrix data is currently sorted - isCounted bool // tracks if timestamps slice is up-to-date + tsList times.Times + isSorted bool + isCounted bool } -// MarshalTimeseries converts a Timeseries into a JSON blob -func (c *Client) MarshalTimeseries(ts timeseries.Timeseries) ([]byte, error) { - // Marshal the Envelope back to a json object for Cache Storage - return json.Marshal(ts.(*ResultsEnvelope)) -} +type fromTimeFunc func(interface{}) (time.Time, error) +type toTimeFunc func(time.Time) interface{} -// UnmarshalTimeseries converts a JSON blob into a Timeseries -func (c *Client) UnmarshalTimeseries(data []byte) (timeseries.Timeseries, error) { - re := &ResultsEnvelope{} - err := json.Unmarshal(data, re) - return re, err +var toMsString toTimeFunc = func(t time.Time) interface{} { + return strconv.FormatInt(t.UnixNano()/nanosPerMillisecond, 10) } -// Parts ... -func (rv ResponseValue) Parts(timeKey, valKey string) (string, time.Time, float64, ResponseValue) { - - if len(rv) < 3 { - return noParts() - } - - labels := make([]string, 0, len(rv)-2) - var t time.Time - var val float64 +var fromMsString fromTimeFunc = func(v interface{}) (time.Time, error) { + var msInt int64 var err error - - meta := make(ResponseValue) - - for k, v := range rv { - switch k { - case timeKey: - t, err = msToTime(v.(string)) - if err != nil { - return noParts() - } - case valKey: - if av, ok := v.(float64); ok { - val = av - continue - } - val, err = strconv.ParseFloat(v.(string), 64) - if err != nil { - return noParts() - } - default: - meta[k] = v - labels = append(labels, fmt.Sprintf("%s=%v", k, v)) + s, ok := v.(string) + if ok { + msInt, err = strconv.ParseInt(s, 10, 64) + if err != nil { + return time.Time{}, err } - } - sort.Strings(labels) - return fmt.Sprintf("{%s}", strings.Join(labels, ";")), t, val, meta -} - -func noParts() (string, time.Time, float64, ResponseValue) { - return "{}", time.Time{}, 0.0, ResponseValue{} -} - -// MarshalJSON ... -func (re ResultsEnvelope) MarshalJSON() ([]byte, error) { - - if len(re.Meta) < 2 { - return nil, ErrNotEnoughFields(len(re.Meta)) - } - - var mpl, fl int - for _, v := range re.Data { - lp := len(v.Points) - fl += lp - if mpl < lp { - mpl = lp + } else { + f, ok := v.(float64) + if !ok { + return time.Time{}, fmt.Errorf("unrecognized JSON type for timestamp") } + msInt = int64(f) } + return time.Unix(msInt/millisPerSecond, (msInt%millisPerSecond)*nanosPerMillisecond), nil - rsp := &Response{ - Meta: re.Meta, - RawData: make([]ResponseValue, 0, fl), - Rows: re.ValueCount(), - StepDuration: re.StepDuration, - ExtentList: re.ExtentList, - } - - rsp.Order = make([]string, 0, len(re.Meta)) - for _, k := range re.Meta { - rsp.Order = append(rsp.Order, k.Name) - } - - // Assume the first item in the meta array is the time, and the second is the value - timestampFieldName := rsp.Order[0] - valueFieldName := rsp.Order[1] - - tm := make(map[time.Time][]ResponseValue) - tl := make(times.Times, 0, mpl) - - l := len(re.Data) - - prepareMarshalledPoints := func(ds *DataSet) { - - var ok bool - var t []ResponseValue - - for _, p := range ds.Points { - - t, ok = tm[p.Timestamp] - if !ok { - tl = append(tl, p.Timestamp) - t = make([]ResponseValue, 0, l) - } - - r := ResponseValue{ - timestampFieldName: strconv.FormatInt(p.Timestamp.UnixNano()/int64(time.Millisecond), 10), - valueFieldName: strconv.FormatFloat(p.Value, 'f', -1, 64), - } - for k2, v2 := range ds.Metric { - r[k2] = v2 - } - - t = append(t, r) - tm[p.Timestamp] = t - } - } - - for _, key := range re.SeriesOrder { - if ds, ok := re.Data[key]; ok { - prepareMarshalledPoints(ds) - } - } - - sort.Sort(tl) - - for _, t := range tl { - rsp.RawData = append(rsp.RawData, tm[t]...) - } +} - bytes, err := json.Marshal(rsp) - if err != nil { - return nil, err +var fromSec fromTimeFunc = func(v interface{}) (time.Time, error) { + fv, ok := v.(float64) + if !ok { + return time.Time{}, fmt.Errorf("json time value incorrect type") } - - return bytes, nil + return time.Unix(int64(fv), 0), nil +} +var toSec toTimeFunc = func(t time.Time) interface{} { + return t.Unix() } -// MarshalJSON ... -func (rsp *Response) MarshalJSON() ([]byte, error) { +var utcLoc *time.Location - buf := &bytes.Buffer{} - buf.WriteString(`{"meta":`) - meta, _ := json.Marshal(rsp.Meta) - buf.Write(meta) - buf.WriteString(`,"data":[`) - d := make([]string, 0, len(rsp.RawData)) - for _, rd := range rsp.RawData { - d = append(d, string(rd.ToJSON(rsp.Order))) - } - buf.WriteString(strings.Join(d, ",") + "]") - buf.WriteString(fmt.Sprintf(`,"rows": %d`, rsp.Rows)) +func init() { + utcLoc, _ = time.LoadLocation("UTC") +} - if rsp.ExtentList != nil && len(rsp.ExtentList) > 0 { - el, _ := json.Marshal(rsp.ExtentList) - buf.WriteString(fmt.Sprintf(`,"extents": %s`, string(el))) - } +const chLayout = "2006-01-02 15:04:05" - buf.WriteString("}") +var fromDateString fromTimeFunc = func(v interface{}) (time.Time, error) { + return time.ParseInLocation(chLayout, v.(string), utcLoc) +} - b := buf.Bytes() +var toDateString toTimeFunc = func(t time.Time) interface{} { + return t.In(utcLoc).Format(chLayout) +} - return b, nil +// Converts a Timeseries into a JSON blob +func (c *Client) MarshalTimeseries(ts timeseries.Timeseries) ([]byte, error) { + return json.Marshal(ts.(*ResultsEnvelope)) } -// ToJSON ... -func (rv ResponseValue) ToJSON(order []string) []byte { - buf := &bytes.Buffer{} - buf.WriteString("{") - lines := make([]string, 0, len(rv)) - for _, k := range order { - if v, ok := rv[k]; ok { +// Converts a JSON blob into a Timeseries +func (c *Client) UnmarshalTimeseries(data []byte) (timeseries.Timeseries, error) { + re := &ResultsEnvelope{} + err := json.Unmarshal(data, re) + return re, err +} - // cleanup here - j, err := json.Marshal(v) - if err != nil { - continue +func (re ResultsEnvelope) MarshalJSON() ([]byte, error) { + if len(re.Meta) == 0 { + return nil, fmt.Errorf("no metadata in ResultsEnvelope") + } + tsField := re.Meta[0].Name + tsType := re.Meta[0].Type + var ttf toTimeFunc + if strings.HasPrefix(tsType, "DateTime") { + ttf = toDateString + } else if strings.HasSuffix(tsType, "t64") { + ttf = toMsString + } else { + ttf = toSec + } + rsp := &Response{ + Meta: re.Meta, + RawData: make([]ResponseValue, 0, len(re.Data)), + } + rows := 0 + for _, p := range re.Data { + rows += len(p.Values) + for _, sp := range p.Values { + rv := ResponseValue{tsField: ttf(p.Timestamp)} + for k, v := range sp { + rv[k] = v } - lines = append(lines, fmt.Sprintf(`"%s":%s`, k, string(j))) + rsp.RawData = append(rsp.RawData, rv) } } - buf.WriteString(strings.Join(lines, ",") + "}") - return buf.Bytes() + rsp.Rows = rows + return json.Marshal(rsp) } -// UnmarshalJSON ... -func (re *ResultsEnvelope) UnmarshalJSON(b []byte) error { +func (re ResultsEnvelope) SeriesCount() int { + return 1 +} +func (re *ResultsEnvelope) UnmarshalJSON(b []byte) error { response := Response{} - err := json.Unmarshal(b, &response) - if err != nil { + if err := json.Unmarshal(b, &response); err != nil { return err } + re.isSorted = false + re.isCounted = false + re.Meta = response.Meta + re.Data = make([]Point, 0, len(response.RawData)) - if len(response.Meta) < 2 { - return ErrNotEnoughFields(len(response.Meta)) + if len(response.RawData) == 0 { + return nil // No data points, we're done } - re.Meta = response.Meta - re.ExtentList = response.ExtentList - re.StepDuration = response.StepDuration - re.SeriesOrder = make([]string, 0) - - // Assume the first item in the meta array is the time field, and the second is the value field - timestampFieldName := response.Meta[0].Name - valueFieldName := response.Meta[1].Name + tsField := response.Meta[0].Name + tsType := response.Meta[0].Type - registeredMetrics := make(map[string]bool) + var ftf fromTimeFunc + if strings.HasPrefix(tsType, "DateTime") { + ftf = fromDateString + } else if strings.HasSuffix(tsType, "t64") { + ftf = fromMsString + } else if strings.HasSuffix(tsType, "t32") { + ftf = fromSec + } else { + return fmt.Errorf("timestamp field not of recognized type") + } - re.Data = make(map[string]*DataSet) - l := len(response.RawData) + pMap := make(map[int64]*Point) for _, v := range response.RawData { - metric, ts, val, meta := v.Parts(timestampFieldName, valueFieldName) - if _, ok := registeredMetrics[metric]; !ok { - registeredMetrics[metric] = true - re.SeriesOrder = append(re.SeriesOrder, metric) + tv, ok := v[tsField] + if !ok { + return fmt.Errorf("missing timestamp field in response data") } - if !ts.IsZero() { - a, ok := re.Data[metric] - if !ok { - a = &DataSet{Metric: meta, Points: make([]Point, 0, l)} - } - a.Points = append(a.Points, Point{Timestamp: ts, Value: val}) - re.Data[metric] = a + ts, err := ftf(tv) + if err != nil { + return fmt.Errorf("timestamp field does not parse to date") } + delete(v, tsField) + pk := ts.Unix() + p, ok := pMap[pk] + if !ok { + p = &Point{Timestamp: ts} + pMap[pk] = p + } + p.Values = append(p.Values, v) } - + for _, p := range pMap { + re.Data = append(re.Data, *p) + } + re.Sort() return nil } - -// Len returns the length of a slice of time series data points -func (p Points) Len() int { - return len(p) -} - -// Less returns true if i comes before j -func (p Points) Less(i, j int) bool { - return p[i].Timestamp.Before(p[j].Timestamp) -} - -// Swap modifies a slice of time series data points by swapping the values in indexes i and j -func (p Points) Swap(i, j int) { - p[i], p[j] = p[j], p[i] -} diff --git a/pkg/proxy/origins/clickhouse/model_test.go b/pkg/proxy/origins/clickhouse/model_test.go index fc6501bcf..60671da98 100644 --- a/pkg/proxy/origins/clickhouse/model_test.go +++ b/pkg/proxy/origins/clickhouse/model_test.go @@ -17,166 +17,110 @@ package clickhouse import ( - "encoding/json" + tt "github.com/tricksterproxy/trickster/pkg/proxy/timeconv" + "github.com/tricksterproxy/trickster/pkg/timeseries" "reflect" - "sort" "testing" "time" - - "github.com/tricksterproxy/trickster/pkg/timeseries" ) -func TestParts(t *testing.T) { +func newRe() *ResultsEnvelope { + return &ResultsEnvelope{ExtentList: timeseries.ExtentList{}, Data: make([]Point, 0)} +} - rv1 := ResponseValue{ - "t": "1557766080000", - "cnt": "27", - "meta1": 200, - "meta2": "value3", +func (re *ResultsEnvelope) addMeta(values ...string) *ResultsEnvelope { + for i := 0; i < len(values); i += 2 { + re.Meta = append(re.Meta, FieldDefinition{Name: values[i], Type: values[i+1]}) } + return re +} - metric, ts, val, _ := rv1.Parts("t", "cnt") - - expectedTs := time.Unix(1557766080, 0) - expectedMetric := "{meta1=200;meta2=value3}" - var expectedValue float64 = 27 - - if ts != expectedTs { - t.Errorf("expected %d got %d", expectedTs.Unix(), ts.Unix()) +func (re *ResultsEnvelope) addPoint(ts int, values ...interface{}) *ResultsEnvelope { + v := ResponseValue{} + for i := 0; i < len(re.Meta)-1; i++ { + v[re.Meta[i+1].Name] = values[i] } + re.Data = append(re.Data, Point{Timestamp: time.Unix(int64(ts), 0), Values: []ResponseValue{v}}) + return re +} - if metric != expectedMetric { - t.Errorf("expected %s got %s", expectedMetric, metric) - } +func (re *ResultsEnvelope) addExtent(e int64) *ResultsEnvelope { + re.ExtentList = append(re.ExtentList, timeseries.Extent{Start: time.Unix(e, 0), End: time.Unix(e, 0)}) + return re +} - if val != expectedValue { - t.Errorf("expected %f got %f", expectedValue, val) +func (re *ResultsEnvelope) addExtents(e ...int64) *ResultsEnvelope { + for i := 0; i < len(e); i += 2 { + re.ExtentList = append(re.ExtentList, timeseries.Extent{Start: time.Unix(e[i], 0), End: time.Unix(e[i+1], 0)}) } + return re +} - rv2 := ResponseValue{ - "t": "1557766080000", - "cnt": "27", - } +func (re *ResultsEnvelope) setStep(s string) *ResultsEnvelope { + re.StepDuration, _ = tt.ParseDuration(s) + return re +} - metric, _, _, _ = rv2.Parts("t", "cnt") - if metric != "{}" { - t.Errorf("expected '{}' got %s", metric) - } +var testJSON1 = `{"meta":[{"name":"t","type":"UInt64"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[{"cnt":"12648509","meta1":200,"meta2":"value2","t":"1557766080000"},` + + `{"cnt":"10260032","meta1":200,"meta2":"value3","t":"1557766680000"},` + + `{"cnt":"1","meta1":206,"meta2":"value3","t":"1557767280000"}],"rows":3}` - rv3 := ResponseValue{ - "t": "A557766080000", - "cnt": "27", - "meta1": 200, - } +var testJSONStringDate = `{"meta":[{"name":"t","type":"DateTime('Etc/UTC')"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[{"cnt":"12648509","meta1":200,"meta2":"value2","t":"2019-05-13 16:48:00"},` + + `{"cnt":"10260032","meta1":200,"meta2":"value3","t":"2019-05-13 16:58:00"},` + + `{"cnt":"1","meta1":206,"meta2":"value3","t":"2019-05-13 17:08:00"}],"rows":3}` - metric, _, _, _ = rv3.Parts("t", "cnt") - if metric != "{}" { - t.Errorf("expected '{}' got %s", metric) - } +var testJSONUInt32Date = `{"meta":[{"name":"t","type":"UInt32"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[{"cnt":"12648509","meta1":200,"meta2":"value2","t":1557766080},` + + `{"cnt":"10260032","meta1":200,"meta2":"value3","t":1557766680},` + + `{"cnt":"1","meta1":206,"meta2":"value3","t":1557767280}],"rows":3}` - rv4 := ResponseValue{ - "t": "1557766080000", - "cnt": "2a7", - "meta1": 200, - } +var testEmptyJSON = `{"meta":[{"name":"t","type":"UInt64"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[],"rows": 0}` - metric, _, _, _ = rv4.Parts("t", "cnt") - if metric != "{}" { - t.Errorf("expected '{}' got %s", metric) - } +var testJSONInt32 = `{"meta":[{"name":"t","type":"UInt32"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[{"cnt":"12648509","meta1":200,"meta2":"value2","t":1557766080},` + + `{"cnt":"10260032","meta1":200,"meta2":"value3","t":1557766680},` + + `{"cnt":"1","meta1":206,"meta2":"value3","t":1557767280}],"rows":3}` - rv5 := ResponseValue{ - "t": "1557766080000", - "cnt": 27.5, - "meta1": 200, - } +var testJSONInt64 = `{"meta":[{"name":"t","type":"UInt64"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[{"cnt":"12648509","meta1":200,"meta2":"value2","t":1557766080000},` + + `{"cnt":"10260032","meta1":200,"meta2":"value3","t":1557766680000},` + + `{"cnt":"1","meta1":206,"meta2":"value3","t":1557767280000}],"rows":3}` - metric, _, _, _ = rv5.Parts("t", "cnt") - if metric != "{meta1=200}" { - t.Errorf("expected '{meta1=200}' got %s", metric) - } +var testJSONDateTime = `{"meta":[{"name":"t","type":"DateTime('Etc\/UTC')"},{"name":"cnt","type":"UInt64"},` + + `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + + `"data":[{"cnt":"12648509","meta1":200,"meta2":"value2","t":"2019-05-13 16:48:00"},` + + `{"cnt":"10260032","meta1":200,"meta2":"value3","t":"2019-05-13 16:58:00"},` + + `{"cnt":"1","meta1":206,"meta2":"value3","t":"2019-05-13 17:08:00"}],"rows":3}` -} +var testRE = newRe().addMeta("t", "UInt64", "cnt", "UInt64", "meta1", "UInt16", "meta2", "String"). + addPoint(1557766080, "12648509", 200, "value2"). + addPoint(1557766680, "10260032", 200, "value3"). + addPoint(1557767280, "1", 206, "value3") -var testJSON1 = []byte(`{"meta":[{"name":"t","type":"UInt64"},{"name":"cnt","type":"UInt64"},` + - `{"name":"meta1","type":"UInt16"},{"name":"meta2","type":"String"}],` + - `"data":[{"t":"1557766080000","cnt":"12648509","meta1":200,"meta2":"value2"},` + - `{"t":"1557766080000","cnt":"10260032","meta1":200,"meta2":"value3"},` + - `{"t":"1557766080000","cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}`, -) -var testJSON2 = []byte(`{"meta":[{"name":"t"}],"data":[{"t":"1557766080000","cnt":"12648509",` + - `"meta1":200,"meta2":"value2"},{"t":"1557766080000","cnt":"10260032","meta1":200,"meta2":"value3"},` + - `{"t":"1557766080000","cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}`, -) // should generate error - -var testRE1 = &ResultsEnvelope{ - Meta: []FieldDefinition{ - { - Name: "t", - Type: "UInt64", - }, - { - Name: "cnt", - Type: "UInt64", - }, - { - Name: "meta1", - Type: "UInt16", - }, - { - Name: "meta2", - Type: "String", - }, - }, - - SeriesOrder: []string{"1", "2", "3"}, - - Data: map[string]*DataSet{ - "1": { - Metric: map[string]interface{}{ - "meta1": 200, - "meta2": "value2", - }, - Points: []Point{ - { - Timestamp: time.Unix(1557766080, 0), - Value: 12648509, - }, - }, - }, - "2": { - Metric: map[string]interface{}{ - "meta1": 200, - "meta2": "value3", - }, - Points: []Point{ - { - Timestamp: time.Unix(1557766080, 0), - Value: 10260032, - }, - }, - }, - "3": { - Metric: map[string]interface{}{ - "meta1": 206, - "meta2": "value3", - }, - Points: []Point{ - { - Timestamp: time.Unix(1557766080, 0), - Value: 1, - }, - }, - }, - }, -} +var testREStringDate = newRe().addMeta("t", "DateTime('Etc/UTC')", "cnt", "UInt64", "meta1", "UInt16", "meta2", "String"). + addPoint(1557766080, "12648509", 200, "value2"). + addPoint(1557766680, "10260032", 200, "value3"). + addPoint(1557767280, "1", 206, "value3") -func TestREMarshalJSON(t *testing.T) { +var testREUInt32Date = newRe().addMeta("t", "UInt32", "cnt", "UInt64", "meta1", "UInt16", "meta2", "String"). + addPoint(1557766080, "12648509", 200, "value2"). + addPoint(1557766680, "10260032", 200, "value3"). + addPoint(1557767280, "1", 206, "value3") +func TestREMarshalJSON(t *testing.T) { expectedLen := len(testJSON1) re := ResultsEnvelope{} - err := re.UnmarshalJSON(testJSON1) + err := re.UnmarshalJSON([]byte(testJSON1)) if err != nil { t.Error(err) } @@ -193,38 +137,14 @@ func TestREMarshalJSON(t *testing.T) { re.Meta = re.Meta[:0] _, err = re.MarshalJSON() if err == nil { - t.Errorf("expected error: %s", `Must have at least two fields; only have 0`) - } - -} - -func TestRSPMarshalJSON(t *testing.T) { - - rsp := &Response{ExtentList: timeseries.ExtentList{{Start: time.Unix(0, 0), End: time.Unix(5, 0)}}} - - bytes, err := rsp.MarshalJSON() - if err != nil { - t.Error(err) - return - } - - rsp1 := &Response{} - json.Unmarshal(bytes, rsp1) - - if rsp.ExtentList[0].Start.Unix() != rsp1.ExtentList[0].Start.Unix() { - t.Errorf("expected %d got %d", rsp.ExtentList[0].Start.Unix(), rsp.ExtentList[0].Start.Unix()) - } - - if rsp.ExtentList[0].End.Unix() != rsp1.ExtentList[0].End.Unix() { - t.Errorf("expected %d got %d", rsp.ExtentList[0].End.Unix(), rsp.ExtentList[0].End.Unix()) + t.Errorf("expected error: %s", `no Meta in ResultsEnvelope`) } } func TestUnmarshalTimeseries(t *testing.T) { - client := &Client{} - ts, err := client.UnmarshalTimeseries(testJSON1) + ts, err := client.UnmarshalTimeseries([]byte(testJSON1)) if err != nil { t.Error(err) return @@ -248,31 +168,44 @@ func TestUnmarshalTimeseries(t *testing.T) { return } - _, err = client.UnmarshalTimeseries(testJSON2) - if err == nil { - t.Errorf("expected error: %s", `Must have at least two fields; only have 1`) - return - } - } func TestMarshalTimeseries(t *testing.T) { expectedLen := len(testJSON1) client := &Client{} - bytes, err := client.MarshalTimeseries(testRE1) + bytes, err := client.MarshalTimeseries(testRE) if err != nil { t.Error(err) return } - if !reflect.DeepEqual(testJSON1, bytes) { + if !reflect.DeepEqual([]byte(testJSON1), bytes) { t.Errorf("expected %d got %d", expectedLen, len(bytes)) } -} -func TestUnmarshalJSON(t *testing.T) { + bytes, err = client.MarshalTimeseries(testREStringDate) + if err != nil { + t.Error(err) + return + } + + if string(bytes) != testJSONStringDate { + t.Errorf("Expected %s got %s", testJSONStringDate, string(bytes)) + } + bytes, err = client.MarshalTimeseries(testREUInt32Date) + if err != nil { + t.Error(err) + return + } + + if !reflect.DeepEqual([]byte(testJSONUInt32Date), bytes) { + t.Errorf("got %s", string(bytes)) + } +} + +func TestUnmarshalValidJSON(t *testing.T) { re := ResultsEnvelope{} - err := re.UnmarshalJSON(testJSON1) + err := re.UnmarshalJSON([]byte(testJSON1)) if err != nil { t.Error(err) return @@ -282,64 +215,99 @@ func TestUnmarshalJSON(t *testing.T) { t.Errorf(`expected 4. got %d`, len(re.Meta)) return } - m := re.Meta[2] if m.Name != "meta1" { t.Errorf(`expected meta1 found %s`, m.Name) return } - if len(re.Data) != 3 { t.Errorf(`expected 3. got %d`, len(re.Data)) return } - - key := "{meta1=206;meta2=value3}" - v, ok := re.Data[key] - if !ok { - t.Errorf(`expected to find key %s`, key) + if re.Data[0].Values[0]["cnt"] != "12648509" { + t.Errorf(`expected 1 got %s`, re.Data[0].Values[0]["cnt"]) return } - if len(v.Points) != 1 { - t.Errorf(`expected 1 got %d`, len(v.Points)) + err = re.UnmarshalJSON([]byte(testJSONInt32)) + if err != nil { + t.Error(err) return } - - if v.Points[0].Value != 1 { - t.Errorf(`expected 1 got %f`, v.Points[0].Value) + if re.Data[0].Timestamp != time.Unix(1557766080, 0) { + t.Errorf("incorrect timestamp for point %v", re.Data[0]) return } - err = re.UnmarshalJSON(nil) - if err == nil { - t.Errorf("expected error: %s", `unexpected end of JSON input`) + err = re.UnmarshalJSON([]byte(testJSONInt64)) + if err != nil { + t.Error(err) return } - - err = re.UnmarshalJSON(testJSON2) - if err == nil { - t.Errorf("expected error: %s", `Must have at least two fields; only have 1`) + if re.Data[1].Timestamp.Unix() != time.Unix(1557766680, 0).Unix() { + t.Errorf("incorrect timestamp for point %v", re.Data[1]) return } -} + re = ResultsEnvelope{} + err = re.UnmarshalJSON([]byte(testJSONDateTime)) + if err != nil { + t.Error(err) + } -func TestMSToTime(t *testing.T) { - _, err := msToTime("bad") - if err == nil { - t.Errorf("expected error for invalid syntax") + if re.Data[1].Timestamp.Unix() != time.Unix(1557766680, 0).Unix() { + t.Errorf("incorrect timestamp for point %v", re.Data[1]) + return + } + + err = re.UnmarshalJSON([]byte(testEmptyJSON)) + if err != nil { + t.Error(err) + return } } -func TestSortPoints(t *testing.T) { +var testUnknownTimestampJSON = `{"meta":[{"name":"t", "type":"String"},{"name":"cnt", "type":"UInt64"}],` + + `"data":[{"t":"1557766080000","cnt":"12648509","meta1":200,"meta2":"value2"},` + + `{"t":"1557766080000","cnt":"10260032","meta1":200,"meta2":"value3"},` + + `{"t":"1557766080000","cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}` + +var testUnexpectedTimestamp = `{"meta":[{"name":"t", "type":"UInt64"},{"name":"cnt", "type":"UInt64"}],` + + `"data":[{"t":"15577660800000","cnt":"12648509","meta1":200,"meta2":"value2"},` + + `{"t":"1557766080000","cnt":"10260032","meta1":200,"meta2":"value3"},` + + `{"t":true,"cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}` - p := Points{{Timestamp: time.Unix(1, 0), Value: 12}, - {Timestamp: time.Unix(0, 0), Value: 13}, {Timestamp: time.Unix(2, 0), Value: 22}} - sort.Sort(p) +var testMissingTimestampJSON = `{"meta":[{"name":"t", "type":"UInt64"},{"name":"cnt", "type":"UInt64"}],` + + `"data":[{"t":"1557766080000","cnt":"12648509","meta1":200,"meta2":"value2"},` + + `{"bad":"1557766080000","cnt":"10260032","meta1":200,"meta2":"value3"},` + + `{"bad":"1557766080000","cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}` - if p[0].Timestamp.Unix() != 0 { - t.Errorf("expected %d got %d", 0, p[0].Timestamp.Unix()) - } +var testNullTimestampJSON = `{"meta":[{"name":"t", "type":"UInt32"},{"name":"cnt", "type":"UInt64"}],` + + `"data":[{"t":null,"cnt":"12648509","meta1":200,"meta2":"value2"},` + + `{"t":"1557766080000","cnt":"10260032","meta1":200,"meta2":"value3"},` + + `{"t":"1557766080000","cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}` +var testInvalidTimestampJSON = `{"meta":[{"name":"t","type":"UInt64"},{"name":"cnt", "type":"UInt64"}],` + + `"data":[{"t":"1557766080000","cnt":"12648509","meta1":200,"meta2":"value2"},` + + `{"t":"1557766080000bad","cnt":"10260032","meta1":200,"meta2":"value3"},` + + `{"t":"1557766080000","cnt":"1","meta1":206,"meta2":"value3"}],"rows":3}` + +func TestUnmarshallBadJSON(t *testing.T) { + test := func(run string, json string, error string) { + t.Run(run, func(t *testing.T) { + re := &ResultsEnvelope{} + err := re.UnmarshalJSON([]byte(json)) + if err == nil { + t.Errorf("Expected error parsing JSON") + } else if err.Error() != error { + t.Errorf("expected error: %s, got: %s", error, err.Error()) + } + }) + } + test("no json data", "", "unexpected end of JSON input") + test("bad timestamp type", testUnknownTimestampJSON, "timestamp field not of recognized type") + test("unexpected timestamp field type", testUnexpectedTimestamp, "timestamp field does not parse to date") + test("null timestamp field", testNullTimestampJSON, "timestamp field does not parse to date") + test("invalid timestamp field", testInvalidTimestampJSON, "timestamp field does not parse to date") + test("missing timestamp field", testMissingTimestampJSON, "missing timestamp field in response data") } diff --git a/pkg/proxy/origins/clickhouse/parsing.go b/pkg/proxy/origins/clickhouse/parsing.go new file mode 100644 index 000000000..f0157ecb6 --- /dev/null +++ b/pkg/proxy/origins/clickhouse/parsing.go @@ -0,0 +1,342 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package clickhouse + +import ( + "fmt" + ttc "github.com/tricksterproxy/trickster/pkg/proxy/timeconv" + "github.com/tricksterproxy/trickster/pkg/util/regexp/matching" + "regexp" + "strconv" + "strings" + "time" + + "github.com/tricksterproxy/trickster/pkg/timeseries" +) + +// This file handles tokenization of time parameters within ClickHouse queries +// for cache key hashing and delta proxy caching. + +// Tokens for String Interpolation +const ( + tkTimestamp1 = "<$TIMESTAMP1$>" + tkTimestamp2 = "<$TIMESTAMP2$>" +) + +const bQuote = byte('\'') +const bBS = byte('\\') +const bSpace = byte(' ') +const bOpen = byte('(') +const bClose = byte(')') + +var chOperators = map[byte]bool{ + byte('*'): true, + byte('/'): true, + byte(','): true, + byte('+'): true, + byte('-'): true, + byte('>'): true, + byte('<'): true, + byte('='): true, +} + +var timeFuncMap = map[string]string{ + "toStartOfMinute": "1m", + "toStartOfFiveMinute": "5m", + "toStartOfTenMinutes": "10m", + "toStartOfFifteenMinutes": "15m", + "toStartOfHour": "1h", + "toDate": "1d", +} + +var parsingNowProvider = func() int { + return int(time.Now().Unix()) +} + +var reTimeFieldAndStep = regexp.MustCompile(`.*intDiv\(toU?Int32\((?P[a-zA-Z0-9._-]+)\),(?P[0-9]+)\)`) + +var srm = func(str, old string) string { + return strings.Replace(str, old, "", -1) +} + +var sup = strings.ToUpper + +func interpolateTimeQuery(template string, extent *timeseries.Extent, step time.Duration) string { + endTime := int(extent.End.Unix()) + int(step.Seconds()) // Add step to normalized end time + return strings.Replace(strings.Replace(template, tkTimestamp1, + strconv.Itoa(int(extent.Start.Unix())), -1), tkTimestamp2, strconv.Itoa(endTime), -1) +} + +func parseRawQuery(query string, trq *timeseries.TimeRangeQuery) error { + var duration string + var err error + parts := findParts(query) + size := len(parts) + // We take advantage of the fact we always have slop at the end of valid queries to avoid checking for + // index out of bounds errors + if size < 4 { + return fmt.Errorf("unrecognized query format") + } + if sup(parts[size-2]+" "+parts[size-1]) != "FORMAT JSON" { + return fmt.Errorf("non JSON formats not supported") + } + + var tsColumn, tsAlias string + var startTime, endTime, whereStart int + var whereClause []string + for i := 0; i < size; i++ { + p := parts[i] + if tsColumn == "" && srm(sup(p), "(") == "SELECT" { + i++ + testCol, testAlias := parts[i], "" + cl := strings.Index(testCol, ",") + if cl > 0 && strings.Index(testCol[cl:], ")") == -1 { + testCol = testCol[:cl] + } else if strings.ToUpper(parts[i+1]) == "AS" { + i += 2 + testAlias = strings.Split(parts[i], ",")[0] + } else { + i++ + testAlias = strings.Split(parts[i], ",")[0] + } + // First look for a Grafana/division type time series query + m := matching.GetNamedMatches(reTimeFieldAndStep, testCol, nil) + if tf, ok := m["timeField"]; ok { + tsColumn, tsAlias = tf, testAlias + duration = m["step"] + "s" + } else { + // Otherwise check for the use of built-in ClickHouse time grouping functions + for k, v := range timeFuncMap { + fi := strings.Index(testCol, k+"(") + if fi > -1 { + cp := strings.Index(testCol[fi+1:], ")") + if cp == -1 { + return fmt.Errorf("invalid time function syntax") + } + tsColumn, tsAlias = testCol[fi+len(k)+1:fi+cp+1], testAlias + duration = v + break + } + } + } + } + if tsColumn != "" && (sup(parts[i]) == "PREWHERE" || sup(parts[i]) == "WHERE") { + startTime, endTime, whereClause, tsColumn, err = findRange(parts[i+1:], tsColumn, tsAlias) + if err != nil { + return err + } + if startTime > 0 { + whereStart = i + break + } + } + } + + if tsColumn == "" { + return fmt.Errorf("no matching time value column found") + } + if startTime == 0 { + return fmt.Errorf("no time range found") + } + + trq.Step, _ = ttc.ParseDuration(duration) + trq.Statement = strings.Join(parts[:whereStart+1], " ") + " " + strings.Join(whereClause, " ") + trq.Extent.Start = time.Unix(int64(startTime), 0) + trq.TimestampFieldName = tsColumn + + bf := trq.BackfillTolerance + + now := parsingNowProvider() + if endTime == 0 { + endTime = now + } + + step := int(trq.Step.Seconds()) + + norm := now / step * step + if endTime > norm { + // Pad out endTime if we are in the "now" bucket so the last extent is not truncated + endTime = norm + step + bft := time.Duration(now-norm) * time.Second + if bft > bf { + bf = bft + } + } else { + // Reduce backfill tolerance to nothing if we're well outside the window + etNorm := endTime / step * step + diff := time.Duration(now-etNorm) * time.Second + nbf := bf - diff + if nbf < bf { + bf = nbf + } + } + + trq.BackfillTolerance = bf + trq.Extent.End = time.Unix(int64(endTime), 0) + return nil +} + +func parseTime(ts string) (int, error) { + if strings.HasPrefix(ts, "now(") { + now := parsingNowProvider() + if len(ts) > 6 && ts[4] == '-' { + sub := 1 + for _, ms := range strings.Split(ts[5:], "*") { + m, err := strconv.Atoi(ms) + if err != nil { + return 0, err + } + sub *= m + } + return now - sub, nil + } + return now, nil + } + t, err := strconv.Atoi(ts) + if err == nil { + return t, nil + } + td, err := fromDateString(srm(ts, "'")) + if err == nil { + return int(td.Unix()), nil + } + return 0, err +} + +func findRange(parts []string, column string, alias string) (int, int, []string, string, error) { + var err error + var st, et int + var actColumn string + size := len(parts) + var wc = make([]string, 0, size) + for i := 0; i < size; i++ { + p := parts[i] + if strings.ToUpper(p) == "BETWEEN" { + f := parts[i-1] + if f == column || f == alias { + actColumn = f + i++ + ts := srm(srm(srm(parts[i], "toDateTime("), "toDate("), ")") + st, err = parseTime(ts) + if err != nil { + return st, et, nil, column, err + } + i++ + if sup(parts[i]) != "AND" { + return st, et, nil, column, fmt.Errorf("unrecognized between clause") + } + i++ + ts = srm(srm(srm(parts[i], "toDateTime("), "toDate("), ")") + et, err = parseTime(ts) + if err != nil { + return st, et, nil, column, err + } + wc = wc[:len(wc)-1] // Remove column name before BETWEEN + wc = append(wc, "("+actColumn+" >= "+tkTimestamp1+" AND "+actColumn+" < "+tkTimestamp2+") ") + wc = append(wc, parts[i+1:]...) + return st, et, wc, actColumn, nil + } + } + + tf := srm(srm(srm(p, "toDateTime("), "toDate("), ")") + tfSize := len(tf) + tl := strings.Index(tf, column) + if tl == 0 { + actColumn = column + } else { + tl = strings.Index(tf, alias) + if tl == 0 { + actColumn = alias + } else { + wc = append(wc, p) + continue + } + } + + tl = len(actColumn) + if tl < tfSize && tf[tl] == '>' { + if tl < tfSize+1 && tf[tl+1] == '=' { + tl++ + } + st, err = parseTime(tf[tl+1:]) + if err != nil { + return st, et, nil, column, err + } + wc = append(wc, actColumn+" >= "+tkTimestamp1) + } else if tl < tfSize && tf[tl] == '<' { + if tl < tfSize+1 && tf[tl+1] == '=' { + tl++ + } + et, err = parseTime(tf[tl+1:]) + if err != nil { + return st, et, nil, column, err + } + wc = append(wc, actColumn+" < "+tkTimestamp2) + } else { + wc = append(wc, p) + } + } + if st == 0 { + return 0, 0, nil, column, nil + } + return st, et, wc, actColumn, nil +} + +func findParts(query string) []string { + bytes := []byte(strings.TrimSpace(query)) + size := len(bytes) + buf := make([]byte, 0, size) + inQuote := false + escaped := false + trimming := false + fields := make([]string, 0, 30) + fieldStart := 0 + for i := 0; i < size; i++ { + b := bytes[i] + if inQuote { + if b == bQuote && !escaped { + inQuote = false + } + escaped = !escaped && b == bBS + buf = append(buf, b) + continue + } + if b == bSpace { + if trimming { + continue + } + for i++; i < size; i++ { + b = bytes[i] + if b == bClose || chOperators[b] { + break + } + if b != bSpace { + fields = append(fields, string(buf[fieldStart:])) + buf = append(buf, bSpace) + fieldStart = len(buf) + break + } + } + } + if b == bQuote { + inQuote = true + } + trimming = b == bOpen || chOperators[b] + buf = append(buf, b) + } + return append(fields, string(buf[fieldStart:])) +} diff --git a/pkg/proxy/origins/clickhouse/parsing_test.go b/pkg/proxy/origins/clickhouse/parsing_test.go new file mode 100644 index 000000000..2e1dde4c7 --- /dev/null +++ b/pkg/proxy/origins/clickhouse/parsing_test.go @@ -0,0 +1,163 @@ +/* + * Copyright 2018 Comcast Cable Communications Management, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package clickhouse + +import ( + "github.com/tricksterproxy/trickster/pkg/timeseries" + "testing" + "time" +) + +func testNow() int { + t, _ := time.Parse(chLayout, "2020-06-01 12:02:00") + return int(t.Unix()) +} + +func TestFindParts(t *testing.T) { + query := `WITH 'igor * 31 + \' dks( k )' as igor, 3600 as x SELECT ( intDiv(toUInt32(datetime), x) * x) * 1000 AS t,` + + ` count() as cnt FROM comcast_ott_maple.atsec_chi WHERE datetime >= 1589904000 AND datetime < 1589997600)` + + ` GROUP BY t ORDER BY t DESC FORMAT JSON` + parts := findParts(query) + if len(parts) != 27 { + t.Errorf("Find parts returned %d, expected %d incorrect number of parts", len(parts), 30) + } +} + +func TestGoodQueries(t *testing.T) { + query := `SELECT ( intDiv(toUInt32(datetime), 300) * 300) * 1000 AS t,` + + ` count() as cnt FROM test_db.test_table WHERE datetime between 1589904000 AND 1589997600` + + ` GROUP BY t ORDER BY t DESC FORMAT JSON` + trq := ×eries.TimeRangeQuery{} + err := parseRawQuery(query, trq) + if err != nil { + t.Error(err) + } + if trq.Step != 300*time.Second { + t.Errorf("Step of %d did not match 300 seconds", trq.Step) + } + + if trq.Extent.Start != time.Unix(int64(1589904000), 0) { + t.Errorf("Expected start time of 1589904000, got %d", trq.Extent.Start.Unix()) + } + trq = ×eries.TimeRangeQuery{} + query = `SELECT toStartOfFiveMinute(datetime) AS t, count() as cnt FROM test_db.test_table WHERE t > ` + + `'2020-05-30 11:00:00' AND t < now() - 300 FORMAT JSON` + err = parseRawQuery(query, trq) + if err != nil { + t.Error(err) + } + if trq.Step != 300*time.Second { + t.Errorf("Step of %d did not match 300 seconds", trq.Step) + } + + trq = ×eries.TimeRangeQuery{} + query = `WITH dictGetString('test_cache', server, xxHash64(server)) as server_name ` + + `SELECT toStartOfFiveMinute(datetime) AS t, count() as cnt FROM test_db.test_table WHERE t > ` + + `'2020-05-30 11:00:00' AND t < now() - 300 FORMAT JSON` + err = parseRawQuery(query, trq) + if err != nil { + t.Error(err) + } + if trq.Step != 300*time.Second { + t.Errorf("Step of %d did not match 300 seconds", trq.Step) + } + if trq.Statement != `WITH dictGetString('test_cache',server,xxHash64(server)) as server_name `+ + `SELECT toStartOfFiveMinute(datetime) AS t,count() as cnt `+ + `FROM test_db.test_table WHERE t >= <$TIMESTAMP1$> AND t < <$TIMESTAMP2$> FORMAT JSON` { + t.Errorf("Tokenized statement did not match query") + } + + trq = ×eries.TimeRangeQuery{} + query = `SELECT toInt32(toStartOfFiveMinute(datetime)) AS t, count() as cnt FROM test_db.test_table WHERE datetime > ` + + `'2020-05-30 11:00:00' AND datetime < now() - 300 FORMAT JSON` + err = parseRawQuery(query, trq) + if err != nil { + t.Error(err) + } + if trq.Step != 300*time.Second { + t.Errorf("Step of %d did not match 300 seconds", trq.Step) + } + +} + +func TestBadQueries(t *testing.T) { + test := func(run string, query string, es string) { + t.Run(run, func(t *testing.T) { + trq := ×eries.TimeRangeQuery{} + err := parseRawQuery(query, trq) + if err == nil { + t.Errorf("Expected err parsing time query") + } else if err.Error() != es { + t.Errorf("Expected error \"%s\", got \"%s\"", es, err.Error()) + } + }) + } + + test("Query too short", "SELECT too short", "unrecognized query format") + test("Query not JSON format", "SELECT toStartOfMinute(datetime), cnt FROM test_table FORMAT TSV", + "non JSON formats not supported") + test("Bad time function", "WITH 300 as t SELECT toStartOfTenMinutes(datetime, cnt FROM "+ + "test_table FORMAT JSON", "invalid time function syntax") + test("Not valid time series", "SELECT a, b FROM test_table FORMAT JSON", "no matching time value column found") + test("No range on time column", "SELECT toDate(datetime) t, cnt FROM test_table WHERE cnt > 100 FORMAT JSON", + "no time range found") + test("Weird between clause", "SELECT toDate(datetime) as t, cnt FROM test_table WHERE t BETWEEN 15002 15003 FORMAT JSON", + "unrecognized between clause") + test("Invalid start time", "SELECT toDate(datetime), cnt FROM test_table WHERE datetime BETWEEN November AND December FORMAT JSON", + `parsing time "November" as "2006-01-02 15:04:05": cannot parse "November" as "2006"`) + test("Invalid end time", "SELECT toDate(datetime), cnt FROM test_table WHERE datetime BETWEEN now() AND December FORMAT JSON", + `parsing time "December" as "2006-01-02 15:04:05": cannot parse "December" as "2006"`) + test("Invalid start time", "SELECT toDate(datetime), cnt FROM test_table WHERE datetime>='November' AND datetime <=now() FORMAT JSON", + `parsing time "November" as "2006-01-02 15:04:05": cannot parse "November" as "2006"`) + test("Invalid end time", "SELECT toDate(datetime), cnt FROM test_table WHERE datetime > '2020-10-15 00:22:00' AND datetime <'December' FORMAT JSON", + `parsing time "December" as "2006-01-02 15:04:05": cannot parse "December" as "2006"`) + test("Weird now expression", "SELECT toDate(datetime), cnt FROM test_table WHERE datetime > '2020-10-15 00:22:00' AND datetime = '2020-06-01 11:00:00'` + + " FORMAT JSON" + test("Backfill from now should be at least configured value", 180, query, 180) + query = `select intDiv(toInt32(datetime), 300) * 300 as t, sum(cnt) FROM testTable WHERE datetime >= '2020-06-01 11:00:00' ` + + ` and datetime <= '2020-06-01 12:02:00' FORMAT JSON` + test("Backfill from now bucket should be at least to prior bucket value", 60, query, 120) + query = `select intDiv(toInt32(datetime), 20) * 20 as t, sum(cnt) FROM testTable WHERE datetime >= '2020-06-01 11:00:00' ` + + ` and datetime <= '2020-06-01 12:01:00' FORMAT JSON` + test("Backfill should be at least now - configured value value", 180, query, 120) + query = `select intDiv(toInt32(datetime), 20) * 20 as t, sum(cnt) FROM testTable WHERE datetime >= '2020-06-01 11:00:00' ` + + ` and datetime <= '2020-06-01 11:50:00' FORMAT JSON` + test("Backfill should be negative/ignored if too far back", 180, query, -540) +} diff --git a/pkg/proxy/origins/clickhouse/routes.go b/pkg/proxy/origins/clickhouse/routes.go index 2c392dfd7..4df535ba6 100644 --- a/pkg/proxy/origins/clickhouse/routes.go +++ b/pkg/proxy/origins/clickhouse/routes.go @@ -27,7 +27,7 @@ import ( func (c *Client) registerHandlers() { c.handlersRegistered = true c.handlers = make(map[string]http.Handler) - // This is the registry of handlers that Trickster supports for InfluxDB, + // This is the registry of handlers that Trickster supports for ClickHouse, // and are able to be referenced by name (map key) in Config Files c.handlers["health"] = http.HandlerFunc(c.HealthHandler) c.handlers["query"] = http.HandlerFunc(c.QueryHandler) diff --git a/pkg/proxy/origins/clickhouse/series.go b/pkg/proxy/origins/clickhouse/series.go index c17a98d6b..c8b9d3e50 100644 --- a/pkg/proxy/origins/clickhouse/series.go +++ b/pkg/proxy/origins/clickhouse/series.go @@ -18,8 +18,6 @@ package clickhouse import ( "sort" - "sync" - "sync/atomic" "time" "github.com/tricksterproxy/trickster/pkg/sort/times" @@ -36,33 +34,13 @@ func (re *ResultsEnvelope) SetStep(step time.Duration) { re.StepDuration = step } -// Merge merges the provided Timeseries list into the base Timeseries (in the order provided) +// Merges the provided Timeseries list into the base Timeseries (in the order provided) // and optionally sorts the merged Timeseries func (re *ResultsEnvelope) Merge(sort bool, collection ...timeseries.Timeseries) { - - wg := sync.WaitGroup{} - mtx := sync.Mutex{} - for _, ts := range collection { if ts != nil { re2 := ts.(*ResultsEnvelope) - for k, s := range re2.Data { - wg.Add(1) - go func(l string, d *DataSet) { - mtx.Lock() - if _, ok := re.Data[l]; !ok { - re.Data[l] = d - mtx.Unlock() - wg.Done() - return - } - re.Data[l].Points = append(re.Data[l].Points, d.Points...) - mtx.Unlock() - wg.Done() - }(k, s) - } - wg.Wait() - re.mergeSeriesOrder(re2.SeriesOrder) + re.Data = append(re.Data, re2.Data...) re.ExtentList = append(re.ExtentList, re2.ExtentList...) } } @@ -75,51 +53,7 @@ func (re *ResultsEnvelope) Merge(sort bool, collection ...timeseries.Timeseries) } } -func (re *ResultsEnvelope) mergeSeriesOrder(so2 []string) { - - if len(so2) == 0 { - return - } - - if len(re.SeriesOrder) == 0 { - re.SeriesOrder = so2 - return - } - - so1 := make([]string, len(re.SeriesOrder), len(re.SeriesOrder)+len(so2)) - copy(so1, re.SeriesOrder) - adds := make([]string, 0, len(so2)) - added := make(map[string]bool) - - for _, n := range so2 { - if _, ok := re.Data[n]; !ok { - if _, ok2 := added[n]; !ok2 { - adds = append(adds, n) - added[n] = true - } - continue - } - - if len(adds) > 0 { - for i, v := range so1 { - if v == n { - adds = append(adds, so1[i:]...) - so1 = append(so1[0:i], adds...) - } - } - adds = adds[:0] - } - } - - if len(adds) > 0 { - so1 = append(so1, adds...) - } - - re.SeriesOrder = so1 - -} - -// Clone returns a perfect copy of the base Timeseries +// Returns a perfect copy of the base Timeseries func (re *ResultsEnvelope) Clone() timeseries.Timeseries { re2 := &ResultsEnvelope{ isCounted: re.isCounted, @@ -127,22 +61,14 @@ func (re *ResultsEnvelope) Clone() timeseries.Timeseries { StepDuration: re.StepDuration, } - wg := sync.WaitGroup{} - mtx := sync.Mutex{} - - if re.SeriesOrder != nil { - re2.SeriesOrder = make([]string, len(re.SeriesOrder)) - copy(re2.SeriesOrder, re.SeriesOrder) - } - if re.ExtentList != nil { re2.ExtentList = make(timeseries.ExtentList, len(re.ExtentList)) copy(re2.ExtentList, re.ExtentList) } - if re.tslist != nil { - re2.tslist = make(times.Times, len(re.tslist)) - copy(re2.tslist, re.tslist) + if re.tsList != nil { + re2.tsList = make(times.Times, len(re.tsList)) + copy(re2.tsList, re.tsList) } if re.Meta != nil { @@ -150,48 +76,27 @@ func (re *ResultsEnvelope) Clone() timeseries.Timeseries { copy(re2.Meta, re.Meta) } - if re.Serializers != nil { - re2.Serializers = make(map[string]func(interface{})) - wg.Add(1) - go func() { - for k, s := range re.Serializers { - re2.Serializers[k] = s - } - wg.Done() - }() - } - if re.timestamps != nil { re2.timestamps = make(map[time.Time]bool) for k, v := range re.timestamps { - wg.Add(1) - go func(t time.Time, b bool) { - mtx.Lock() - re2.timestamps[t] = b - mtx.Unlock() - wg.Done() - }(k, v) + re2.timestamps[k] = v } } if re.Data != nil { - re2.Data = make(map[string]*DataSet) - wg.Add(1) - go func() { - for k, ds := range re.Data { - ds2 := &DataSet{Metric: make(map[string]interface{})} - for l, v := range ds.Metric { - ds2.Metric[l] = v + re2.Data = make([]Point, 0) + for _, p1 := range re.Data { + p2 := Point{Timestamp: p1.Timestamp} + for _, rv := range p1.Values { + pm := ResponseValue{} + for k, v := range rv { + pm[k] = v } - ds2.Points = ds.Points[:] - re2.Data[k] = ds2 + p2.Values = append(p2.Values, pm) } - wg.Done() - }() + re2.Data = append(re2.Data, p2) + } } - - wg.Wait() - return re2 } @@ -204,7 +109,7 @@ func (re *ResultsEnvelope) CropToSize(sz int, t time.Time, lur timeseries.Extent x := len(re.ExtentList) // The Series has no extents, so no need to do anything if x < 1 { - re.Data = make(map[string]*DataSet) + re.Data = make([]Point, 0) re.ExtentList = timeseries.ExtentList{} return } @@ -221,7 +126,7 @@ func (re *ResultsEnvelope) CropToSize(sz int, t time.Time, lur timeseries.Extent return } - rc := tc - sz // # of required timestamps we must delete to meet the rentention policy + rc := tc - sz // # of required timestamps we must delete to meet the retention policy removals := make(map[time.Time]bool) done := false var ok bool @@ -238,25 +143,13 @@ func (re *ResultsEnvelope) CropToSize(sz int, t time.Time, lur timeseries.Extent } } - wg := sync.WaitGroup{} - mtx := sync.Mutex{} - - for _, s := range re.Data { - tmp := s.Points[:0] - for _, r := range s.Points { - wg.Add(1) - go func(p Point) { - mtx.Lock() - if _, ok := removals[p.Timestamp]; !ok { - tmp = append(tmp, p) - } - mtx.Unlock() - wg.Done() - }(r) + tmp := make([]Point, 0, len(re.Data)-len(removals)) + for _, p := range re.Data { + if _, ok := removals[p.Timestamp]; !ok { + tmp = append(tmp, p) } - wg.Wait() - s.Points = tmp } + re.Data = tmp tl := times.FromMap(removals) sort.Sort(tl) @@ -268,7 +161,6 @@ func (re *ResultsEnvelope) CropToSize(sz int, t time.Time, lur timeseries.Extent } } } - wg.Wait() re.ExtentList = timeseries.ExtentList(el).Compress(re.StepDuration) re.Sort() @@ -278,17 +170,10 @@ func (re *ResultsEnvelope) CropToSize(sz int, t time.Time, lur timeseries.Extent // CropToRange assumes the base Timeseries is already sorted, and will corrupt an unsorted Timeseries func (re *ResultsEnvelope) CropToRange(e timeseries.Extent) { re.isCounted = false - x := len(re.ExtentList) - // The Series has no extents, so no need to do anything - if x < 1 { - re.Data = make(map[string]*DataSet) - re.ExtentList = timeseries.ExtentList{} - return - } - // if the extent of the series is entirely outside the extent of the crop range, return empty set and bail - if re.ExtentList.OutsideOf(e) { - re.Data = make(map[string]*DataSet) + // The Series has no extents, or is outside of the crop range, so no need to do anything + if len(re.ExtentList) < 1 || re.ExtentList.OutsideOf(e) { + re.Data = make([]Point, 0) re.ExtentList = timeseries.ExtentList{} return } @@ -296,7 +181,7 @@ func (re *ResultsEnvelope) CropToRange(e timeseries.Extent) { // if the series extent is entirely inside the extent of the crop range, simply adjust down its ExtentList if re.ExtentList.InsideOf(e) { if re.ValueCount() == 0 { - re.Data = make(map[string]*DataSet) + re.Data = make([]Point, 0) } re.ExtentList = re.ExtentList.Crop(e) return @@ -307,51 +192,41 @@ func (re *ResultsEnvelope) CropToRange(e timeseries.Extent) { return } - deletes := make(map[string]bool) - - for i, s := range re.Data { - start := -1 - end := -1 - for j, val := range s.Points { - t := val.Timestamp - if t.Equal(e.End) { - // for cases where the first element is the only qualifying element, - // start must be incremented or an empty response is returned - if j == 0 || t.Equal(e.Start) || start == -1 { - start = j - } - end = j + 1 - break - } - if t.After(e.End) { - end = j - break - } - if t.Before(e.Start) { - continue - } - if start == -1 && (t.Equal(e.Start) || (e.End.After(t) && t.After(e.Start))) { + start := -1 + end := -1 + for j, val := range re.Data { + t := val.Timestamp + if t.Equal(e.End) { + // for cases where the first element is the only qualifying element, + // start must be incremented or an empty response is returned + if j == 0 || t.Equal(e.Start) || start == -1 { start = j } + end = j + 1 + break } - if start != -1 && len(s.Points) > 0 { - if end == -1 { - end = len(s.Points) - } - re.Data[i].Points = s.Points[start:end] - } else { - deletes[i] = true + if t.After(e.End) { + end = j + break + } + if t.Before(e.Start) { + continue + } + if start == -1 && (t.Equal(e.Start) || (e.End.After(t) && t.After(e.Start))) { + start = j } } - - for i := range deletes { - delete(re.Data, i) + if start != -1 && len(re.Data) > 0 { + if end == -1 { + end = len(re.Data) + } + re.Data = re.Data[start:end] } re.ExtentList = re.ExtentList.Crop(e) } -// Sort sorts all Values in each Series chronologically by their timestamp +// Sorts all Points chronologically by their timestamp func (re *ResultsEnvelope) Sort() { if re.isSorted || len(re.Data) == 0 { @@ -359,65 +234,39 @@ func (re *ResultsEnvelope) Sort() { } tsm := map[time.Time]bool{} - wg := sync.WaitGroup{} - mtx := sync.Mutex{} - - for i, s := range re.Data { - m := make(map[time.Time]Point) - keys := make(times.Times, 0, len(s.Points)) - for _, v := range s.Points { - wg.Add(1) - go func(sp Point) { - mtx.Lock() - if _, ok := m[sp.Timestamp]; !ok { - keys = append(keys, sp.Timestamp) - m[sp.Timestamp] = sp - } - tsm[sp.Timestamp] = true - mtx.Unlock() - wg.Done() - }(v) - } - wg.Wait() - sort.Sort(keys) - sm := make(Points, 0, len(keys)) - for _, key := range keys { - sm = append(sm, m[key]) + m := make(map[time.Time]Point) + keys := make(times.Times, 0, len(re.Data)) + for _, v := range re.Data { + if _, ok := m[v.Timestamp]; !ok { + keys = append(keys, v.Timestamp) + m[v.Timestamp] = v } - re.Data[i].Points = sm + tsm[v.Timestamp] = true } - + sort.Sort(keys) + sm := make([]Point, 0, len(keys)) + for _, key := range keys { + sm = append(sm, m[key]) + } + re.Data = sm sort.Sort(re.ExtentList) re.timestamps = tsm - re.tslist = times.FromMap(tsm) + re.tsList = times.FromMap(tsm) re.isCounted = true re.isSorted = true } func (re *ResultsEnvelope) updateTimestamps() { - - wg := sync.WaitGroup{} - mtx := sync.Mutex{} - if re.isCounted { return } m := make(map[time.Time]bool) - for _, s := range re.Data { - for _, v := range s.Points { - wg.Add(1) - go func(t time.Time) { - mtx.Lock() - m[t] = true - mtx.Unlock() - wg.Done() - }(v.Timestamp) - } + for _, p := range re.Data { + m[p.Timestamp] = true } - wg.Wait() re.timestamps = m - re.tslist = times.FromMap(m) + re.tsList = times.FromMap(m) re.isCounted = true } @@ -438,63 +287,28 @@ func (re *ResultsEnvelope) TimestampCount() int { return len(re.timestamps) } -// SeriesCount returns the number of individual Series in the Timeseries object -func (re *ResultsEnvelope) SeriesCount() int { - return len(re.Data) -} - // ValueCount returns the count of all values across all Series in the Timeseries object func (re *ResultsEnvelope) ValueCount() int { - c := 0 - wg := sync.WaitGroup{} - mtx := sync.Mutex{} - for i := range re.Data { - wg.Add(1) - go func(j int) { - mtx.Lock() - c += j - mtx.Unlock() - wg.Done() - }(len(re.Data[i].Points)) - } - wg.Wait() - return c + return len(re.Data) } // Size returns the approximate memory utilization in bytes of the timeseries func (re *ResultsEnvelope) Size() int { - wg := sync.WaitGroup{} - c := uint64(24 + // .stepDuration - (25 * len(re.timestamps)) + // time.Time (24) + bool(1) - (24 * len(re.tslist)) + // time.Time (24) - (len(re.ExtentList) * 72) + // time.Time (24) * 3 - 2, // .isSorted + .isCounted - ) - for i := range re.Meta { - wg.Add(1) - go func(j int) { - atomic.AddUint64(&c, uint64(len(re.Meta[j].Name)+len(re.Meta[j].Type))) - wg.Done() - }(i) + var size int + for _, m := range re.Meta { + size += len(m.Name) + len(m.Type) } - for _, s := range re.SeriesOrder { - wg.Add(1) - go func(t string) { - atomic.AddUint64(&c, uint64(len(t))) - wg.Done() - }(s) - } - for k, v := range re.Data { - atomic.AddUint64(&c, uint64(len(k))) - wg.Add(1) - go func(d *DataSet) { - atomic.AddUint64(&c, uint64(len(d.Points)*32)) - for mk := range d.Metric { - atomic.AddUint64(&c, uint64(len(mk)+8)) // + approx len of value (interface) + + for _, p := range re.Data { + size += 8 // Timestamp guess + for _, rv := range p.Values { + for k2 := range rv { + size += len(k2) + 16 // Key length + values guess } - wg.Done() - }(v) + } } - wg.Wait() - return int(c) + + // ExtentList + StepDuration + Timestamps + Times + isCounted + isSorted + size += (len(re.ExtentList) * 24) + 8 + (len(re.timestamps) * 9) + (len(re.tsList) * 8) + 2 + return size } diff --git a/pkg/proxy/origins/clickhouse/series_test.go b/pkg/proxy/origins/clickhouse/series_test.go index fc02d1968..1e7326519 100644 --- a/pkg/proxy/origins/clickhouse/series_test.go +++ b/pkg/proxy/origins/clickhouse/series_test.go @@ -17,16 +17,17 @@ package clickhouse import ( + "github.com/tricksterproxy/trickster/pkg/timeseries" "reflect" - "strconv" - "strings" "testing" "time" - - "github.com/tricksterproxy/trickster/pkg/sort/times" - "github.com/tricksterproxy/trickster/pkg/timeseries" ) +const testStep = time.Duration(10) * time.Second + +var before, after *ResultsEnvelope +var extent timeseries.Extent + func TestSetStep(t *testing.T) { re := ResultsEnvelope{} const step = time.Duration(300) * time.Minute @@ -45,1279 +46,200 @@ func TestStep(t *testing.T) { } } +func testRe() *ResultsEnvelope { + return newRe().addMeta("t", "UInt64", "v", "Float") +} + +func testEx(start int64, end int64) timeseries.Extent { + return timeseries.Extent{Start: time.Unix(start, 0), End: time.Unix(end, 0)} +} + +// Multi-series tests omitted for single series ClickHouse func TestMerge(t *testing.T) { - tests := []struct { - a, b, merged *ResultsEnvelope - }{ - // Run 0: Series that adhere to rule - { - a: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(10, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10, 0), End: time.Unix(10, 0)}, - }, - StepDuration: time.Duration(5) * time.Second, - }, - b: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(5, 0), Value: 1.5}, - {Timestamp: time.Unix(15, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(5, 0), End: time.Unix(5, 0)}, - timeseries.Extent{Start: time.Unix(15, 0), End: time.Unix(15, 0)}, - }, - StepDuration: time.Duration(5) * time.Second, - }, - merged: &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(5, 0), time.Unix(10, 0), time.Unix(15, 0)}, - timestamps: map[time.Time]bool{time.Unix(5, 0): true, time.Unix(10, 0): true, time.Unix(15, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(5, 0), Value: 1.5}, - {Timestamp: time.Unix(10, 0), Value: 1.5}, - {Timestamp: time.Unix(15, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(5, 0), End: time.Unix(15, 0)}, - }, - StepDuration: time.Duration(5) * time.Second, - }, - }, - // Run 1: Empty second series - { - a: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(10000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - b: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{}, - }, - }, - ExtentList: timeseries.ExtentList{}, - StepDuration: time.Duration(5000) * time.Second, - }, - merged: &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(10000, 0)}, - timestamps: map[time.Time]bool{time.Unix(10000, 0): true}, - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(10000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - }, - // Run 2: second series has new metric - { - a: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(10000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - b: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(15000, 0), End: time.Unix(15000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - merged: &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(10000, 0), time.Unix(15000, 0)}, - timestamps: map[time.Time]bool{time.Unix(10000, 0): true, time.Unix(15000, 0): true}, - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(15000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - }, - // Run 3: merge one metric, one metric unchanged - { - a: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(10000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - b: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(15000, 0), End: time.Unix(15000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - merged: &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(10000, 0), time.Unix(15000, 0)}, - timestamps: map[time.Time]bool{time.Unix(10000, 0): true, time.Unix(15000, 0): true}, - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(15000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - }, - // Run 4: merge multiple extents - { - a: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(15000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - b: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(30000, 0), Value: 1.5}, - {Timestamp: time.Unix(35000, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(30000, 0), Value: 1.5}, - {Timestamp: time.Unix(35000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(30000, 0), End: time.Unix(35000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - merged: &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(10000, 0), time.Unix(15000, 0), - time.Unix(30000, 0), time.Unix(35000, 0)}, - timestamps: map[time.Time]bool{time.Unix(10000, 0): true, - time.Unix(15000, 0): true, time.Unix(30000, 0): true, time.Unix(35000, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - {Timestamp: time.Unix(30000, 0), Value: 1.5}, - {Timestamp: time.Unix(35000, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - {Timestamp: time.Unix(30000, 0), Value: 1.5}, - {Timestamp: time.Unix(35000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(15000, 0)}, - timeseries.Extent{Start: time.Unix(30000, 0), End: time.Unix(35000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - }, - // - // - // Run 5: merge with some overlapping extents - { - a: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(15000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - b: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - {Timestamp: time.Unix(20000, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - {Timestamp: time.Unix(20000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(15000, 0), End: time.Unix(20000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - merged: &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(10000, 0), time.Unix(15000, 0), time.Unix(20000, 0)}, - timestamps: map[time.Time]bool{time.Unix(10000, 0): true, - time.Unix(15000, 0): true, time.Unix(20000, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - {Timestamp: time.Unix(20000, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(10000, 0), Value: 1.5}, - {Timestamp: time.Unix(15000, 0), Value: 1.5}, - {Timestamp: time.Unix(20000, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(10000, 0), End: time.Unix(20000, 0)}, - }, - StepDuration: time.Duration(5000) * time.Second, - }, - }, - } - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - test.a.Merge(true, test.b) - if !reflect.DeepEqual(test.merged, test.a) { - t.Errorf("mismatch\n actual=%v\nexpected=%v", test.a, test.merged) + var a, b, merged *ResultsEnvelope + test := func(run string) { + t.Run(run, func(t *testing.T) { + merged.Sort() + a.Merge(true, b) + if !reflect.DeepEqual(merged, a) { + t.Errorf("mismatch\n actual=%v\nexpected=%v", a, merged) } }) } + + a = testRe().addPoint(10, 1.5).addExtent(10).setStep("5s") + b = testRe().addPoint(5, 1.5).addPoint(15, 1.5).addExtent(5).addExtent(15).setStep("5s") + merged = testRe().addPoint(5, 1.5).addPoint(10, 1.5).addPoint(15, 1.5).addExtents(5, 15).setStep("5s") + test("Series that adhere to rule") + + a = testRe().addPoint(1000, 1.5).addExtent(10000).setStep("5000s") + b = testRe().setStep("5000s") + merged = testRe().addPoint(1000, 1.5).addExtent(10000).setStep("5000s") + test("Empty second series") + + a = testRe().addPoint(10000, 1.5).addPoint(15000, 1.5).addExtents(10000, 15000).setStep("5000s") + b = testRe().addPoint(30000, 1.5).addPoint(35000, 1.5).addExtents(30000, 35000).setStep("5000s") + merged = testRe().addPoint(10000, 1.5).addPoint(15000, 1.5).addPoint(30000, 1.5).addPoint(35000, 1.5). + addExtents(10000, 15000).addExtents(30000, 35000).setStep("5000s") + test("Merge multiple extends") + + a = testRe().addPoint(10000, 1.5).addPoint(15000, 1.5).addExtents(10000, 15000).setStep("5000s") + b = testRe().addPoint(15000, 1.5).addPoint(20000, 1.5).addExtents(15000, 20000).setStep("5000s") + merged = testRe().addPoint(10000, 1.5).addPoint(15000, 1.5).addPoint(20000, 1.5). + addExtents(10000, 20000).setStep("5000s") + test("Merge with some overlapping extents") } +// Multi-series tests omitted for single series ClickHouse func TestCropToRange(t *testing.T) { - tests := []struct { - before, after *ResultsEnvelope - extent timeseries.Extent - }{ - // Run 0: Case where the very first element in the matrix has a timestamp matching the extent's end - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1644004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1644004600, 0), End: time.Unix(1644004600, 0)}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1644004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1644004600, 0), End: time.Unix(1644004600, 0)}, - }, - StepDuration: testStep, - }, - extent: timeseries.Extent{ - Start: time.Unix(0, 0), - End: time.Unix(1644004600, 0), - }, - }, - // Run 1: Case where we trim nothing - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1544004600, 0), End: time.Unix(1544004600, 0)}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1544004600, 0), End: time.Unix(1544004600, 0)}, - }, - StepDuration: testStep, - }, - extent: timeseries.Extent{ - Start: time.Unix(0, 0), - End: time.Unix(1644004600, 0), - }, - }, - // Run 2: Case where we trim everything (all data is too late) - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1544004600, 0), End: time.Unix(1544004600, 0)}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{}, - StepDuration: testStep, - }, - extent: timeseries.Extent{ - Start: time.Unix(0, 0), - End: time.Unix(10, 0), - }, - }, - // Run 3: Case where we trim everything (all data is too early) - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(100, 0)}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{}, - StepDuration: testStep, - }, - extent: timeseries.Extent{ - Start: time.Unix(10000, 0), - End: time.Unix(20000, 0), - }, - }, - // Run 4: Case where we trim some off the beginning - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "d": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "d": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(300, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(300, 0), - End: time.Unix(400, 0), - }, - }, - // Run 5: Case where we trim some off the ends - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "e": { - Metric: map[string]interface{}{"__name__": "e"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "e": { - Metric: map[string]interface{}{"__name__": "e"}, - Points: []Point{ - {Timestamp: time.Unix(200, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(200, 0), End: time.Unix(200, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(200, 0), - End: time.Unix(200, 0), - }, - }, - // Run 6: Case where the last datapoint is on the Crop extent - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "f": { - Metric: map[string]interface{}{"__name__": "f"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "f": { - Metric: map[string]interface{}{"__name__": "f"}, - Points: []Point{ - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(200, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(200, 0), - End: time.Unix(300, 0), - }, - }, - // Run 7: Case where we aren't given any datapoints - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "g": { - Metric: map[string]interface{}{"__name__": "g"}, - Points: []Point{}, - }, - }, - ExtentList: timeseries.ExtentList{}, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{}, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(200, 0), - End: time.Unix(300, 0), - }, - }, - - // Run 9: Case where after cropping, an inner series is empty/removed - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(600, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(400, 0), End: time.Unix(600, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(400, 0), - End: time.Unix(600, 0), - }, - }, - // Run 10: Case where after cropping, the front series is empty/removed - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(600, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(400, 0), End: time.Unix(600, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(400, 0), - End: time.Unix(600, 0), - }, - }, - // Run 11: Case where after cropping, the back series is empty/removed - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(100, 0), Value: 1.5}, - {Timestamp: time.Unix(200, 0), Value: 1.5}, - {Timestamp: time.Unix(300, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(100, 0), End: time.Unix(600, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(400, 0), Value: 1.5}, - {Timestamp: time.Unix(500, 0), Value: 1.5}, - {Timestamp: time.Unix(600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(400, 0), End: time.Unix(600, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(400, 0), - End: time.Unix(600, 0), - }, - }, - // Run 12: Case where we short circuit since the dataset is already entirely inside the crop range - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{}, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{}, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{}, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(200, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(200, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(100, 0), - End: time.Unix(600, 0), - }, - }, - // Run 13: Case where we short circuit since the dataset is empty - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(200, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(300, 0), End: time.Unix(300, 0)}, - }, - StepDuration: time.Duration(100) * time.Second, - }, - extent: timeseries.Extent{ - Start: time.Unix(300, 0), - End: time.Unix(600, 0), - }, - }, - } - - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - test.before.CropToRange(test.extent) - if !reflect.DeepEqual(test.before, test.after) { - t.Errorf("mismatch\nexpected=%v\ngot=%v", test.after, test.before) + test := func(run string) { + t.Run(run, func(t *testing.T) { + before.CropToRange(extent) + if !reflect.DeepEqual(before, after) { + t.Errorf("mismatch\nexpected=%v\ngot=%v", after, before) } }) } -} -const testStep = time.Duration(10) * time.Second + before = testRe().addPoint(1644004600, 1.5).addExtent(1644004600).setStep("10s") + after = testRe().addPoint(1644004600, 1.5).addExtent(1644004600).setStep("10s") + extent = testEx(0, 1644004600) + test("First element as timestamp matching extent end") + + before = testRe().addPoint(1544004600, 1.5).addExtent(1544004600).setStep("10s") + after = testRe().addPoint(1544004600, 1.5).addExtent(1544004600).setStep("10s") + extent = testEx(0, 1644004600) + test("Trim nothing when series within crop range") + + before = testRe().addPoint(1544004600, 1.5).addExtent(1544004600).setStep("10s") + after = testRe().setStep("10s") + extent = testEx(0, 10) + test("Trim everything when all data is too late") + + before = testRe().addPoint(100, 1.5).addExtent(100).setStep("10s") + after = testRe().setStep("10s") + extent = testEx(10000, 20000) + test("Trim everything when all data is too early") + + before = testRe().addPoint(100, 1.5).addPoint(200, 1.5).addPoint(300, 1.5).addExtents(100, 300).setStep("100s") + after = testRe().addPoint(300, 1.5).addExtent(300).setStep("100s") + extent = testEx(300, 400) + test("Trim some off the beginning") + + before = testRe().addPoint(100, 1.5).addPoint(200, 1.5).addPoint(300, 1.5).addExtents(100, 300).setStep("100s") + after = testRe().addPoint(200, 1.5).addExtent(200).setStep("100s") + extent = testEx(200, 200) + test("Trim some off of both ends") + + before = testRe().addPoint(100, 1.5).addPoint(200, 1.5).addPoint(300, 1.5).addExtents(100, 300).setStep("100s") + after = testRe().addPoint(200, 1.5).addPoint(300, 1.5).addExtents(200, 300).setStep("100s") + extent = testEx(200, 300) + test("last datapoint is on the Crop extent") + + before = testRe().setStep("100s") + after = testRe().setStep("100s") + extent = testEx(200, 300) + test("no data in series provided") + + before = testRe().addExtents(100, 300).setStep("100s") + after = testRe().addExtents(100, 300).setStep("100s") + extent = testEx(0, 500) + test("empty extents fully inside of crop range") + + before = testRe().addExtents(0, 600).setStep("100s") + after = testRe().addExtents(100, 500).setStep("100s") + extent = testEx(100, 500) + test("adjust empty series to crop range") +} func TestCropToSize(t *testing.T) { - + var size int + var bft time.Time now := time.Now().Truncate(testStep) - - tests := []struct { - before, after *ResultsEnvelope - size int - bft time.Time - extent timeseries.Extent - }{ - // case 0: where we already have the number of timestamps we are cropping to - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1444004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1444004600, 0), End: time.Unix(1444004600, 0)}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1444004600, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1444004600, 0), End: time.Unix(1444004600, 0)}, - }, - StepDuration: testStep, - timestamps: map[time.Time]bool{time.Unix(1444004600, 0): true}, - tslist: times.Times{time.Unix(1444004600, 0)}, - isCounted: true, - }, - extent: timeseries.Extent{ - Start: time.Unix(0, 0), - End: time.Unix(1444004600, 0), - }, - size: 1, - bft: now, - }, - - // case 1 - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1444004600, 0), Value: 1.5}, - {Timestamp: time.Unix(1444004610, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1444004600, 0), End: time.Unix(1444004610, 0)}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1444004610, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1444004610, 0), End: time.Unix(1444004610, 0)}, - }, - StepDuration: testStep, - timestamps: map[time.Time]bool{time.Unix(1444004610, 0): true}, - tslist: times.Times{time.Unix(1444004610, 0)}, - isCounted: true, - isSorted: true, - }, - extent: timeseries.Extent{ - Start: time.Unix(0, 0), - End: time.Unix(1444004610, 0), - }, - size: 1, - bft: now, - }, - - // case 2 - empty extent list - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{}, - }, - }, - ExtentList: timeseries.ExtentList{}, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{}, - ExtentList: timeseries.ExtentList{}, - StepDuration: testStep, - }, - extent: timeseries.Extent{}, - size: 1, - bft: now, - }, - - // case 3 - backfill tolerance - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1444004610, 0), Value: 1.5}, - {Timestamp: now, Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1444004610, 0), End: now}, - }, - StepDuration: testStep, - }, - after: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1444004610, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1444004610, 0), End: now.Add(-5 * time.Minute)}, - }, - StepDuration: testStep, - timestamps: map[time.Time]bool{time.Unix(1444004610, 0): true}, - tslist: times.Times{time.Unix(1444004610, 0)}, - isCounted: true, - isSorted: false, - }, - extent: timeseries.Extent{ - Start: time.Unix(0, 0), - End: now, - }, - size: 2, - bft: now.Add(-5 * time.Minute), - }, - } - - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - test.before.CropToSize(test.size, test.bft, test.extent) - - for i := range test.before.ExtentList { - test.before.ExtentList[i].LastUsed = time.Time{} + nowSec := now.Unix() + test := func(run string) { + t.Run(run, func(t *testing.T) { + before.CropToSize(size, bft, extent) + for i := range before.ExtentList { + before.ExtentList[i].LastUsed = time.Time{} } - - if !reflect.DeepEqual(test.before, test.after) { - t.Errorf("mismatch\nexpected=%v\n got=%v", test.after, test.before) + if !reflect.DeepEqual(before, after) { + t.Errorf("mismatch\nexpected=%v\n got=%v", after, before) } }) } + + before = testRe().addPoint(1444004600, 1.5).addExtent(1444004600).setStep("10s") + after = testRe().addPoint(1444004600, 1.5).addExtent(1444004600).setStep("10s") + after.updateTimestamps() + extent = testEx(0, 1444004600) + size = 1 + bft = now + test("Size already less than crop") + + before = testRe().addPoint(1444004600, 1.5).addPoint(1444004610, 1.5).addExtents(1444004600, 1444004610).setStep("10s") + after = testRe().addPoint(1444004610, 1.5).addExtent(1444004610).setStep("10s") + after.Sort() + extent = testEx(0, 1444004610) + size = 1 + bft = now + test("Crop least recently used") + + before = testRe().setStep("10s") + after = testRe().setStep("10s") + extent = testEx(0, nowSec) + size = 2 + bft = now + test("Empty extent list") + + before = testRe().addPoint(1444004610, 1.5).addPoint(int(nowSec), 1.5).addExtents(1444004610, nowSec) + after = testRe().addPoint(1444004610, 1.5).addExtents(1444004610, nowSec-300) + after.updateTimestamps() + size = 2 + extent = testEx(0, nowSec) + bft = now.Add(-5 * time.Minute) + test("Backfill tolerance") } func TestUpdateTimestamps(t *testing.T) { - // test edge condition here (core functionality is tested across this file) re := ResultsEnvelope{isCounted: true} re.updateTimestamps() if re.timestamps != nil { t.Errorf("expected nil map, got size %d", len(re.timestamps)) } - } func TestClone(t *testing.T) { - - tests := []struct { - before *ResultsEnvelope - }{ - // Run 0 - { - before: &ResultsEnvelope{ - Meta: []FieldDefinition{{Name: "1", Type: "string"}}, - Serializers: map[string]func(interface{}){"test": nil}, - tslist: times.Times{time.Unix(1644001200, 0)}, - timestamps: map[time.Time]bool{time.Unix(1644001200, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1644001200, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1644001200, 0), End: time.Unix(1644001200, 0)}, - }, - StepDuration: time.Duration(3600) * time.Second, - SeriesOrder: []string{"a"}, - }, - }, - - // Run 1 - { - before: &ResultsEnvelope{ - tslist: times.Times{time.Unix(1644001200, 0), time.Unix(1644004800, 0)}, - timestamps: map[time.Time]bool{time.Unix(1644001200, 0): true, time.Unix(1644004800, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1644001200, 0), Value: 1.5}, - }, - }, - - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(1644001200, 0), Value: 1.5}, - {Timestamp: time.Unix(1644004800, 0), Value: 1.5}, - }, - }, - }, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(1644001200, 0), End: time.Unix(1644004800, 0)}, - }, - StepDuration: time.Duration(3600) * time.Second, - SeriesOrder: []string{"a", "b"}, - }, - }, - } - - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - after := test.before.Clone() - if !reflect.DeepEqual(test.before, after) { - t.Errorf("mismatch\nexpected %v\nactual %v", test.before, after) - } - }) + before = testRe().addPoint(1644001200, 1.5).addPoint(1644004800, 77.4).addExtents(1644001200, 1644004800).setStep("3600s") + before.Sort() + after := before.Clone() + after.Sort() + if !reflect.DeepEqual(before, after) { + t.Errorf("mismatch\nexpected %v\nactual %v", before, after) } - } func TestSort(t *testing.T) { - tests := []struct { - before, after *ResultsEnvelope - extent timeseries.Extent - }{ - // Case where we trim nothing - { - before: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1544004200, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004800, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, // sort should also dupe kill - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004200, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004800, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(1544004800, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004200, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - }, - }, - }, - }, - after: &ResultsEnvelope{ - isSorted: true, - isCounted: true, - tslist: []time.Time{time.Unix(1544004000, 0), - time.Unix(1544004200, 0), time.Unix(1544004600, 0), time.Unix(1544004800, 0)}, - timestamps: map[time.Time]bool{time.Unix(1544004000, 0): true, time.Unix(1544004200, 0): true, - time.Unix(1544004600, 0): true, time.Unix(1544004800, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004200, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004800, 0), Value: 1.5}, - }, - }, - "b": { - Metric: map[string]interface{}{"__name__": "b"}, - Points: []Point{ - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004200, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004800, 0), Value: 1.5}, - }, - }, - "c": { - Metric: map[string]interface{}{"__name__": "c"}, - Points: []Point{ - {Timestamp: time.Unix(1544004000, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004200, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004600, 0), Value: 1.5}, - {Timestamp: time.Unix(1544004800, 0), Value: 1.5}, - }, - }, - }, - }, - }, + before = testRe().addPoint(1544004200, 1.5).addPoint(1544004600, 1.5).addPoint(1544004800, 1.5). + addPoint(1544004000, 1.5).addPoint(1544004000, 1.5) + after = testRe().addPoint(1544004000, 1.5).addPoint(1544004200, 1.5).addPoint(1544004600, 1.5).addPoint(1544004800, 1.5) + after.isCounted = true + after.isSorted = true + after.tsList = []time.Time{time.Unix(1544004000, 0), + time.Unix(1544004200, 0), time.Unix(1544004600, 0), time.Unix(1544004800, 0)} + after.timestamps = map[time.Time]bool{time.Unix(1544004000, 0): true, time.Unix(1544004200, 0): true, + time.Unix(1544004600, 0): true, time.Unix(1544004800, 0): true} + + before.isSorted = false + before.Sort() + if !reflect.DeepEqual(before, after) { + t.Errorf("mismatch\nexpected=%v\n actual=%v", after, before) } - - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - test.before.isSorted = false - test.before.Sort() - if !reflect.DeepEqual(test.before, test.after) { - t.Errorf("mismatch\nexpected=%v\n actual=%v", test.after, test.before) - } - // test isSorted short circuit - test.before.Sort() - if !reflect.DeepEqual(test.before, test.after) { - t.Errorf("mismatch\nexpected=%v\n actual=%v", test.after, test.before) - } - }) + // test isSorted short circuit + before.Sort() + if !reflect.DeepEqual(before, after) { + t.Errorf("mismatch\nexpected=%v\n actual=%v", after, before) } } @@ -1341,171 +263,35 @@ func TestExtents(t *testing.T) { } func TestSeriesCount(t *testing.T) { - re := &ResultsEnvelope{ - Data: map[string]*DataSet{ - "d": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(99, 0), Value: 1.5}, - {Timestamp: time.Unix(199, 0), Value: 1.5}, - {Timestamp: time.Unix(299, 0), Value: 1.5}, - }, - }, - }, - } + re := testRe().addPoint(99, 1.5).addPoint(199, 1.5).addPoint(299, 1.5) if re.SeriesCount() != 1 { t.Errorf("expected 1 got %d.", re.SeriesCount()) } } func TestValueCount(t *testing.T) { - re := &ResultsEnvelope{ - Data: map[string]*DataSet{ - "d": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(99, 0), Value: 1.5}, - {Timestamp: time.Unix(199, 0), Value: 1.5}, - {Timestamp: time.Unix(299, 0), Value: 1.5}, - }, - }, - }, - } + re := testRe().addPoint(99, 1.5).addPoint(199, 1.5).addPoint(299, 1.5) if re.ValueCount() != 3 { t.Errorf("expected 3 got %d.", re.ValueCount()) } } func TestTimestampCount(t *testing.T) { - - tests := []struct { - ts *ResultsEnvelope - expected int - }{ - { - ts: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "d": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(99, 0), Value: 1.5}, - {Timestamp: time.Unix(199, 0), Value: 1.5}, - {Timestamp: time.Unix(299, 0), Value: 1.5}, - }, - }, - }, - }, - expected: 3, - }, - - { - ts: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "d": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(99, 0), Value: 1.5}, - {Timestamp: time.Unix(199, 0), Value: 1.5}, - }, - }, - }, - }, - expected: 2, - }, - - { - ts: &ResultsEnvelope{ - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "d"}, - Points: []Point{ - {Timestamp: time.Unix(99, 0), Value: 1.5}, - {Timestamp: time.Unix(199, 0), Value: 1.5}, - }, - }, - "e": { - Metric: map[string]interface{}{"__name__": "e"}, - Points: []Point{ - {Timestamp: time.Unix(99, 0), Value: 1.5}, - {Timestamp: time.Unix(299, 0), Value: 1.5}, - }, - }, - }, - }, - expected: 3, - }, + before = testRe().addPoint(99, 1.5).addPoint(199, 1.5).addPoint(299, 1.5) + if before.TimestampCount() != 3 { + t.Errorf("expected 3 got %d.", before.TimestampCount()) } - - for i, test := range tests { - t.Run(strconv.Itoa(i), func(t *testing.T) { - tc := test.ts.TimestampCount() - if tc != test.expected { - t.Errorf("expected %d got %d.", test.expected, tc) - } - }) + before = testRe().addPoint(99, 1.5).addPoint(199, 1.5) + if before.TimestampCount() != 2 { + t.Errorf("expected 2 got %d.", before.TimestampCount()) } } -func TestMergeSeriesOrder(t *testing.T) { - - re := ResultsEnvelope{} - so1 := []string{"a", "e"} - re.mergeSeriesOrder(so1) - if !reflect.DeepEqual(re.SeriesOrder, so1) { - t.Errorf("expected [%s] got [%s]", strings.Join(so1, ","), strings.Join(re.SeriesOrder, ",")) - } - re.Data = map[string]*DataSet{"a": nil, "e": nil} - - so2 := []string{"d", "e"} - ex2 := []string{"a", "d", "e"} - re.mergeSeriesOrder(so2) - if !reflect.DeepEqual(re.SeriesOrder, ex2) { - t.Errorf("expected [%s] got [%s]", strings.Join(ex2, ","), strings.Join(re.SeriesOrder, ",")) - } - re.Data = map[string]*DataSet{"a": nil, "d": nil, "e": nil} - - so3 := []string{"b", "c", "e"} - ex3 := []string{"a", "d", "b", "c", "e"} - re.mergeSeriesOrder(so3) - if !reflect.DeepEqual(re.SeriesOrder, ex3) { - t.Errorf("expected [%s] got [%s]", strings.Join(ex3, ","), strings.Join(re.SeriesOrder, ",")) - } - re.Data = map[string]*DataSet{"a": nil, "d": nil, "b": nil, "c": nil, "e": nil} - - so4 := []string{"f"} - ex4 := []string{"a", "d", "b", "c", "e", "f"} - re.mergeSeriesOrder(so4) - if !reflect.DeepEqual(re.SeriesOrder, ex4) { - t.Errorf("expected [%s] got [%s]", strings.Join(ex4, ","), strings.Join(re.SeriesOrder, ",")) - } - -} - func TestSize(t *testing.T) { - r := &ResultsEnvelope{ - isCounted: true, - isSorted: true, - tslist: times.Times{time.Unix(5, 0), time.Unix(10, 0), time.Unix(15, 0)}, - timestamps: map[time.Time]bool{time.Unix(5, 0): true, time.Unix(10, 0): true, time.Unix(15, 0): true}, - Data: map[string]*DataSet{ - "a": { - Metric: map[string]interface{}{"__name__": "a"}, - Points: []Point{ - {Timestamp: time.Unix(5, 0), Value: 1.5}, - {Timestamp: time.Unix(10, 0), Value: 1.5}, - {Timestamp: time.Unix(15, 0), Value: 1.5}, - }, - }, - }, - Meta: []FieldDefinition{{Name: "test", Type: "Test"}}, - SeriesOrder: []string{"test"}, - ExtentList: timeseries.ExtentList{ - timeseries.Extent{Start: time.Unix(5, 0), End: time.Unix(15, 0)}, - }, - StepDuration: time.Duration(5) * time.Second, - } - i := r.Size() - const expected = 370 + re := testRe().addPoint(5, 1.5).addPoint(10, 1.5).addPoint(15, 1.5).addExtents(5, 15).setStep("5s") + re.Sort() + i := re.Size() + const expected = 173 if i != expected { t.Errorf("expected %d got %d", expected, i) } diff --git a/pkg/proxy/origins/clickhouse/tokenization.go b/pkg/proxy/origins/clickhouse/tokenization.go deleted file mode 100644 index de74938ca..000000000 --- a/pkg/proxy/origins/clickhouse/tokenization.go +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Copyright 2018 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package clickhouse - -import ( - "fmt" - "regexp" - "strconv" - "strings" - "time" - - "github.com/tricksterproxy/trickster/pkg/timeseries" - "github.com/tricksterproxy/trickster/pkg/util/regexp/matching" -) - -// This file handles tokenization of time parameters within ClickHouse queries -// for cache key hashing and delta proxy caching. - -// Tokens for String Interpolation -const ( - tkTimestamp1 = "<$TIMESTAMP1$>" - tkTimestamp2 = "<$TIMESTAMP2$>" -) - -var reTimeFieldAndStep, reTimeClauseAlt *regexp.Regexp - -func init() { - reTimeFieldAndStep = regexp.MustCompile(`(?i)select\s+\(\s*intdiv\s*\(\s*touint32\s*\(\s*` + - `(?P[a-zA-Z0-9\._-]+)\s*\)\s*,\s*(?P[0-9]+)\s*\)\s*\*\s*[0-9]+\s*\)`) - reTimeClauseAlt = regexp.MustCompile(`(?i)\s+(?P(?P>=|>|=|between)\s+` + - `(?PtoDate(Time)?)\((?P[0-9]+)\)(?P\s+and\s+toDate(Time)?\((?P[0-9]+)\))?)`) -} - -func interpolateTimeQuery(template, timeField string, extent *timeseries.Extent) string { - return strings.Replace(strings.Replace(template, tkTimestamp1, - strconv.Itoa(int(extent.Start.Unix())), -1), tkTimestamp2, strconv.Itoa(int(extent.End.Unix())), -1) -} - -var compiledRe = make(map[string]*regexp.Regexp) - -const timeClauseRe = `(?i)(?Pwhere|and)\s+#TIME_FIELD#\s+(?P(?P>=|>|=|between)\s+` + - `(?PtoDate(Time)?)\((?P[0-9]+)\))(?P\s+and\s+toDate(Time)?\((?P[0-9]+)\))?` - -func getQueryParts(query string, timeField string) (string, timeseries.Extent, bool, error) { - - tcKey := timeField + "-tc" - trex, ok := compiledRe[tcKey] - if !ok { - trex = regexp.MustCompile(strings.Replace(timeClauseRe, "#TIME_FIELD#", timeField, -1)) - compiledRe[tcKey] = trex - } - - m := matching.GetNamedMatches(trex, query, nil) - if len(m) == 0 { - return "", timeseries.Extent{}, false, fmt.Errorf("unable to parse time from query: %s", query) - } - - ext, isRelativeTime, err := parseQueryExtents(query, m) - if err != nil { - return "", timeseries.Extent{}, false, err - } - - tq := tokenizeQuery(query, m) - return tq, ext, isRelativeTime, err -} - -// tokenizeQuery will take a ClickHouse query and replace all time conditionals with a single <$TIME_TOKEN$> -func tokenizeQuery(query string, timeParts map[string]string) string { - // First check the existence of timeExpr1, and if exists, tokenize - if expr, ok := timeParts["timeExpr1"]; ok { - if modifier, ok := timeParts["modifier"]; ok { - query = strings.Replace(query, expr, fmt.Sprintf("BETWEEN %s(%s) AND %s(%s)", - modifier, tkTimestamp1, modifier, tkTimestamp2), -1) - // Then check the existence of timeExpr2, and if exists, remove from tokenized version - if expr, ok := timeParts["timeExpr2"]; ok { - query = strings.Replace(query, expr, "", -1) - } - } - } - - if ts1, ok := timeParts["ts1"]; ok { - if strings.Contains(query, "("+ts1+")") { - m := matching.GetNamedMatches(reTimeClauseAlt, query, nil) - if len(m) > 0 { - if modifier, ok := m["modifier"]; ok { - if expression, ok := m["expression"]; ok { - query = strings.Replace(query, expression, fmt.Sprintf("BETWEEN %s(%s) AND %s(%s)", - modifier, tkTimestamp1, modifier, tkTimestamp2), -1) - } - } - } - } - } - - return query -} - -func parseQueryExtents(query string, timeParts map[string]string) (timeseries.Extent, bool, error) { - - var e timeseries.Extent - - isRelativeTime := true - - op, ok := timeParts["operator"] - if !ok { - return e, false, fmt.Errorf("failed to parse query: %s", "could not find operator") - } - - if t, ok := timeParts["ts1"]; ok { - i, err := strconv.ParseInt(t, 10, 64) - if err != nil { - return e, false, fmt.Errorf("failed to parse query: %s", "could not find start time") - } - e.Start = time.Unix(i, 0) - } - - if strings.ToLower(op) == "between" { - isRelativeTime = false - if t, ok := timeParts["ts2"]; ok { - i, err := strconv.ParseInt(t, 10, 64) - if err != nil { - return e, false, fmt.Errorf("failed to parse query: %s", "could not determine end time") - } - e.End = time.Unix(i, 0) - } else { - return e, false, fmt.Errorf("failed to parse query: %s", "could not find end time") - } - } else { - e.End = time.Now() - } - - return e, isRelativeTime, nil -} diff --git a/pkg/proxy/origins/clickhouse/tokenization_test.go b/pkg/proxy/origins/clickhouse/tokenization_test.go deleted file mode 100644 index 7bfd4e2a2..000000000 --- a/pkg/proxy/origins/clickhouse/tokenization_test.go +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2018 Comcast Cable Communications Management, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package clickhouse - -import ( - "testing" -) - -func TestGetQueryPartsFailure(t *testing.T) { - query := "this should fail to parse" - _, _, _, err := getQueryParts(query, "") - if err == nil { - t.Errorf("should have produced error") - } - -} - -func TestParseQueryExtents(t *testing.T) { - - _, _, err := parseQueryExtents("", map[string]string{}) - if err == nil { - t.Errorf("expected error: %s", `failed to parse query: could not find operator`) - } - - _, _, err = parseQueryExtents("", map[string]string{"operator": "", "ts1": "a"}) - if err == nil { - t.Errorf("expected error: %s", `failed to parse query: could not find start time`) - } - - _, _, err = parseQueryExtents("", map[string]string{"operator": "between", "ts1": "1", "ts2": "a"}) - if err == nil { - t.Errorf("expected error: %s", `failed to parse query: could not determine end time`) - } - - _, _, err = parseQueryExtents("", map[string]string{"operator": "between", "ts1": "1"}) - if err == nil { - t.Errorf("expected error: %s", `failed to parse query: could not find end time`) - } - - _, _, err = parseQueryExtents("", map[string]string{"operator": "x", "ts1": "1"}) - if err != nil { - t.Error(err) - } - -} diff --git a/pkg/proxy/origins/clickhouse/url.go b/pkg/proxy/origins/clickhouse/url.go index 31096ba1f..b7555418f 100644 --- a/pkg/proxy/origins/clickhouse/url.go +++ b/pkg/proxy/origins/clickhouse/url.go @@ -35,11 +35,14 @@ func (c *Client) SetExtent(r *http.Request, trq *timeseries.TimeRangeQuery, exte } p := r.URL.Query() - t := trq.TemplateURL.Query() - q := t.Get(upQuery) + q := trq.TemplateURL.Query().Get(upQuery) + + // Force gzip compression since Brotli is broken on CH 20.3 + // See https://github.com/ClickHouse/ClickHouse/issues/9969 + r.Header.Set("Accept-Encoding", "gzip") if q != "" { - p.Set(upQuery, interpolateTimeQuery(q, trq.TimestampFieldName, extent)) + p.Set(upQuery, interpolateTimeQuery(q, extent, trq.Step)) } r.URL.RawQuery = p.Encode() diff --git a/pkg/timeseries/timerangequery.go b/pkg/timeseries/timerangequery.go index e2c9648d4..f1267f61f 100644 --- a/pkg/timeseries/timerangequery.go +++ b/pkg/timeseries/timerangequery.go @@ -42,6 +42,8 @@ type TimeRangeQuery struct { FastForwardDisable bool // IsOffset is true if the query uses a relative offset modifier IsOffset bool + // BackfillTolerance can be updated to override the overall backfill tolerance per query + BackfillTolerance time.Duration } // Clone returns an exact copy of a TimeRangeQuery @@ -124,6 +126,12 @@ func (trq *TimeRangeQuery) String() string { // GetBackfillTolerance will return the backfill tolerance for the query based on the provided // default, and any query-specific tolerance directives included in the query comments func (trq *TimeRangeQuery) GetBackfillTolerance(def time.Duration) time.Duration { + if trq.BackfillTolerance > 0 { + return trq.BackfillTolerance + } + if trq.BackfillTolerance < 0 { + return 0 + } if x := strings.Index(trq.Statement, "trickster-backfill-tolerance:"); x > 1 { x += 29 y := x