Skip to content

Commit

Permalink
unify time parsing logic in prometheus codec; clarify labels-series c…
Browse files Browse the repository at this point in the history
…ompatibility (grafana#10117)

* unify time parsing logic in prometheus codec; clarify labels-series req
  • Loading branch information
francoposa authored and bjorns163 committed Dec 30, 2024
1 parent 5180939 commit 03192e5
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 128 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ require (
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/docker/go-connections v0.4.1-0.20210727194412-58542c764a11 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 // indirect
github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4
github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a // indirect
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb // indirect
github.com/fatih/color v1.16.0 // indirect
Expand Down
191 changes: 112 additions & 79 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"sort"
Expand Down Expand Up @@ -78,24 +77,24 @@ type Codec interface {
Merger
// DecodeMetricsQueryRequest decodes a MetricsQueryRequest from an http request.
DecodeMetricsQueryRequest(context.Context, *http.Request) (MetricsQueryRequest, error)
// DecodeLabelsQueryRequest decodes a LabelsQueryRequest from an http request.
DecodeLabelsQueryRequest(context.Context, *http.Request) (LabelsQueryRequest, error)
// DecodeLabelsSeriesQueryRequest decodes a LabelsSeriesQueryRequest from an http request.
DecodeLabelsSeriesQueryRequest(context.Context, *http.Request) (LabelsSeriesQueryRequest, error)
// DecodeMetricsQueryResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeMetricsQueryResponse(context.Context, *http.Response, MetricsQueryRequest, log.Logger) (Response, error)
// DecodeLabelsQueryResponse decodes a Response from an http response.
// DecodeLabelsSeriesQueryResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeLabelsQueryResponse(context.Context, *http.Response, LabelsQueryRequest, log.Logger) (Response, error)
DecodeLabelsSeriesQueryResponse(context.Context, *http.Response, LabelsSeriesQueryRequest, log.Logger) (Response, error)
// EncodeMetricsQueryRequest encodes a MetricsQueryRequest into an http request.
EncodeMetricsQueryRequest(context.Context, MetricsQueryRequest) (*http.Request, error)
// EncodeLabelsQueryRequest encodes a LabelsQueryRequest into an http request.
EncodeLabelsQueryRequest(context.Context, LabelsQueryRequest) (*http.Request, error)
// EncodeLabelsSeriesQueryRequest encodes a LabelsSeriesQueryRequest into an http request.
EncodeLabelsSeriesQueryRequest(context.Context, LabelsSeriesQueryRequest) (*http.Request, error)
// EncodeMetricsQueryResponse encodes a Response from a MetricsQueryRequest into an http response.
EncodeMetricsQueryResponse(context.Context, *http.Request, Response) (*http.Response, error)
// EncodeLabelsQueryResponse encodes a Response from a LabelsQueryRequest into an http response.
EncodeLabelsQueryResponse(context.Context, *http.Request, Response, bool) (*http.Response, error)
// EncodeLabelsSeriesQueryResponse encodes a Response from a LabelsSeriesQueryRequest into an http response.
EncodeLabelsSeriesQueryResponse(context.Context, *http.Request, Response, bool) (*http.Response, error)
}

// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
Expand Down Expand Up @@ -153,8 +152,8 @@ type MetricsQueryRequest interface {
AddSpanTags(opentracing.Span)
}

// LabelsQueryRequest represents a label names or values query request that can be process by middlewares.
type LabelsQueryRequest interface {
// LabelsSeriesQueryRequest represents a label names, label values, or series query request that can be process by middlewares.
type LabelsSeriesQueryRequest interface {
// GetLabelName returns the label name param from a Label Values request `/api/v1/label/<label_name>/values`
// or an empty string for a Label Names request `/api/v1/labels`
GetLabelName() string
Expand All @@ -178,11 +177,11 @@ type LabelsQueryRequest interface {
// GetHeaders returns the HTTP headers in the request.
GetHeaders() []*PrometheusHeader
// WithLabelName clones the current request with a different label name param.
WithLabelName(string) (LabelsQueryRequest, error)
WithLabelName(string) (LabelsSeriesQueryRequest, error)
// WithLabelMatcherSets clones the current request with different label matchers.
WithLabelMatcherSets([]string) (LabelsQueryRequest, error)
WithLabelMatcherSets([]string) (LabelsSeriesQueryRequest, error)
// WithHeaders clones the current request with different headers.
WithHeaders([]*PrometheusHeader) (LabelsQueryRequest, error)
WithHeaders([]*PrometheusHeader) (LabelsSeriesQueryRequest, error)
// AddSpanTags writes information about this request to an OpenTracing span
AddSpanTags(opentracing.Span)
}
Expand Down Expand Up @@ -356,7 +355,7 @@ func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQuer
return nil, apierror.New(apierror.TypeBadData, err.Error())
}

time, err := DecodeInstantQueryTimeParams(&reqValues, time.Now)
time, err := DecodeInstantQueryTimeParams(&reqValues)
if err != nil {
return nil, DecorateWithParamName(err, "time")
}
Expand Down Expand Up @@ -388,16 +387,18 @@ func httpHeadersToProm(httpH http.Header) []*PrometheusHeader {
return headers
}

func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Request) (LabelsQueryRequest, error) {
func (prometheusCodec) DecodeLabelsSeriesQueryRequest(_ context.Context, r *http.Request) (LabelsSeriesQueryRequest, error) {
if !IsLabelsQuery(r.URL.Path) && !IsSeriesQuery(r.URL.Path) {
return nil, fmt.Errorf("unknown labels query API endpoint %s", r.URL.Path)
return nil, fmt.Errorf("unknown labels or series query API endpoint %s", r.URL.Path)
}

reqValues, err := util.ParseRequestFormWithoutConsumingBody(r)
if err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
}
start, end, err := DecodeLabelsQueryTimeParams(&reqValues, false)
// see DecodeLabelsSeriesQueryTimeParams for notes on time param parsing compatibility
// between label names, label values, and series requests
start, end, err := DecodeLabelsSeriesQueryTimeParams(&reqValues)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -445,26 +446,80 @@ func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Reque
}, nil
}

// TimeParamType enumerates the types of time parameters in Prometheus API.
// https://prometheus.io/docs/prometheus/latest/querying/api/
type TimeParamType int

const (
// RFC3339OrUnixMS represents the <rfc3339 | unix_timestamp> type in Prometheus Querying API docs
RFC3339OrUnixMS TimeParamType = iota
// DurationMS represents the <duration> type in Prometheus Querying API docs
DurationMS
// DurationMSOrFloatMS represents the <duration | float> in Prometheus Querying API docs
DurationMSOrFloatMS
)

// PromTimeParamDecoder provides common functionality for decoding Prometheus time parameters.
type PromTimeParamDecoder struct {
paramName string
timeType TimeParamType
isOptional bool
defaultMSFunc func() int64
}

func (p PromTimeParamDecoder) Decode(reqValues *url.Values) (int64, error) {
rawValue := reqValues.Get(p.paramName)
if rawValue == "" {
if p.isOptional {
if p.defaultMSFunc != nil {
return p.defaultMSFunc(), nil
}
return 0, nil
}
return 0, apierror.New(apierror.TypeBadData, fmt.Sprintf("missing required parameter %q", p.timeType))
}

var t int64
var err error
switch p.timeType {
case RFC3339OrUnixMS:
t, err = util.ParseTime(rawValue)
case DurationMS, DurationMSOrFloatMS:
t, err = util.ParseDurationMS(rawValue)
default:
return 0, apierror.New(apierror.TypeInternal, fmt.Sprintf("unknown time type %v", p.timeType))
}
if err != nil {
return 0, DecorateWithParamName(err, p.paramName)
}

return t, nil
}

var rangeStartParamDecodable = PromTimeParamDecoder{"start", RFC3339OrUnixMS, false, nil}
var rangeEndParamDecodable = PromTimeParamDecoder{"end", RFC3339OrUnixMS, false, nil}
var rangeStepEndParamDecodable = PromTimeParamDecoder{"step", DurationMSOrFloatMS, false, nil}

// DecodeRangeQueryTimeParams encapsulates Prometheus instant query time param parsing,
// emulating the logic in prometheus/prometheus/web/api/v1#API.query_range.
func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64, err error) {
start, err = util.ParseTime(reqValues.Get("start"))
start, err = rangeStartParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, 0, DecorateWithParamName(err, "start")
return 0, 0, 0, err
}

end, err = util.ParseTime(reqValues.Get("end"))
end, err = rangeEndParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, 0, DecorateWithParamName(err, "end")
return 0, 0, 0, err
}

if end < start {
return 0, 0, 0, errEndBeforeStart
}

step, err = parseDurationMs(reqValues.Get("step"))
step, err = rangeStepEndParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, 0, DecorateWithParamName(err, "step")
return 0, 0, 0, err
}

if step <= 0 {
Expand All @@ -480,55 +535,47 @@ func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64,
return start, end, step, nil
}

func instantTimeParamNow() int64 {
return time.Now().UTC().UnixMilli()
}

var instantTimeParamDecodable = PromTimeParamDecoder{"time", RFC3339OrUnixMS, true, instantTimeParamNow}

// DecodeInstantQueryTimeParams encapsulates Prometheus instant query time param parsing,
// emulating the logic in prometheus/prometheus/web/api/v1#API.query.
func DecodeInstantQueryTimeParams(reqValues *url.Values, defaultNow func() time.Time) (time int64, err error) {
timeVal := reqValues.Get("time")
if timeVal == "" {
time = defaultNow().UnixMilli()
} else {
time, err = util.ParseTime(timeVal)
if err != nil {
return 0, DecorateWithParamName(err, "time")
}
func DecodeInstantQueryTimeParams(reqValues *url.Values) (time int64, err error) {
time, err = instantTimeParamDecodable.Decode(reqValues)
if err != nil {
return 0, err
}

return time, err
}

// DecodeLabelsQueryTimeParams encapsulates Prometheus label names and label values query time param parsing,
// emulating the logic in prometheus/prometheus/web/api/v1#API.labelNames and v1#API.labelValues.
//
// Setting `usePromDefaults` true will set missing timestamp params to the Prometheus default
// min and max query timestamps; false will default to 0 for missing timestamp params.
func DecodeLabelsQueryTimeParams(reqValues *url.Values, usePromDefaults bool) (start, end int64, err error) {
var defaultStart, defaultEnd int64
if usePromDefaults {
defaultStart = v1.MinTime.UnixMilli()
defaultEnd = v1.MaxTime.UnixMilli()
}

startVal := reqValues.Get("start")
if startVal == "" {
start = defaultStart
} else {
start, err = util.ParseTime(startVal)
if err != nil {
return 0, 0, DecorateWithParamName(err, "start")
}
// Label names, label values, and series codec applies the prometheus/web/api/v1.MinTime and MaxTime defaults on read
// with GetStartOrDefault/GetEndOrDefault, so we don't need to apply them with a defaultMSFunc here.
// This allows the object to be symmetrically decoded and encoded to and from the http request format,
// as well as indicating when an optional time parameter was not included in the original request.
var labelsStartParamDecodable = PromTimeParamDecoder{"start", RFC3339OrUnixMS, true, nil}
var labelsEndParamDecodable = PromTimeParamDecoder{"end", RFC3339OrUnixMS, true, nil}

// DecodeLabelsSeriesQueryTimeParams encapsulates Prometheus query time param parsing
// for label names, label values, and series endpoints, emulating prometheus/prometheus/web/api/v1.
// Note: the Prometheus HTTP API spec claims that the series endpoint `start` and `end` parameters
// are not optional, but the Prometheus implementation allows them to be optional.
// Until this changes we can reuse the same PromTimeParamDecoder structs as the label names and values endpoints.
func DecodeLabelsSeriesQueryTimeParams(reqValues *url.Values) (start, end int64, err error) {
start, err = labelsStartParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, err
}

endVal := reqValues.Get("end")
if endVal == "" {
end = defaultEnd
} else {
end, err = util.ParseTime(endVal)
if err != nil {
return 0, 0, DecorateWithParamName(err, "end")
}
end, err = labelsEndParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, err
}

if endVal != "" && end < start {
if end != 0 && end < start {
return 0, 0, errEndBeforeStart
}

Expand Down Expand Up @@ -649,7 +696,7 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric
return req.WithContext(ctx), nil
}

func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req LabelsQueryRequest) (*http.Request, error) {
func (c prometheusCodec) EncodeLabelsSeriesQueryRequest(ctx context.Context, req LabelsSeriesQueryRequest) (*http.Request, error) {
var u *url.URL
switch req := req.(type) {
case *PrometheusLabelNamesQueryRequest:
Expand Down Expand Up @@ -823,7 +870,7 @@ func (c prometheusCodec) DecodeMetricsQueryResponse(ctx context.Context, r *http
return resp, nil
}

func (c prometheusCodec) DecodeLabelsQueryResponse(ctx context.Context, r *http.Response, lr LabelsQueryRequest, logger log.Logger) (Response, error) {
func (c prometheusCodec) DecodeLabelsSeriesQueryResponse(ctx context.Context, r *http.Response, lr LabelsSeriesQueryRequest, logger log.Logger) (Response, error) {
spanlog := spanlogger.FromContext(ctx, logger)
buf, err := readResponseBody(r)
if err != nil {
Expand Down Expand Up @@ -959,7 +1006,7 @@ func (c prometheusCodec) EncodeMetricsQueryResponse(ctx context.Context, req *ht
return &resp, nil
}

func (c prometheusCodec) EncodeLabelsQueryResponse(ctx context.Context, req *http.Request, res Response, isSeriesResponse bool) (*http.Response, error) {
func (c prometheusCodec) EncodeLabelsSeriesQueryResponse(ctx context.Context, req *http.Request, res Response, isSeriesResponse bool) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

Expand Down Expand Up @@ -1156,20 +1203,6 @@ func readResponseBody(res *http.Response) ([]byte, error) {
return buf.Bytes(), nil
}

func parseDurationMs(s string) (int64, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second/time.Millisecond)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, apierror.Newf(apierror.TypeBadData, "cannot parse %q to a valid duration. It overflows int64", s)
}
return int64(ts), nil
}
if d, err := model.ParseDuration(s); err == nil {
return int64(d) / int64(time.Millisecond/time.Nanosecond), nil
}
return 0, apierror.Newf(apierror.TypeBadData, "cannot parse %q to a valid duration", s)
}

func encodeTime(t int64) string {
f := float64(t) / 1.0e3
return strconv.FormatFloat(f, 'f', -1, 64)
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/querymiddleware/codec_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {

for _, tc := range []struct {
name string
request LabelsQueryRequest
request LabelsSeriesQueryRequest
isSeriesResponse bool
responseHeaders http.Header
resp prometheusAPIResponse
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
decoded, err := codec.DecodeLabelsQueryResponse(context.Background(), httpResponse, tc.request, log.NewNopLogger())
decoded, err := codec.DecodeLabelsSeriesQueryResponse(context.Background(), httpResponse, tc.request, log.NewNopLogger())
if err != nil || tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
return
Expand All @@ -328,7 +328,7 @@ func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, decoded, tc.isSeriesResponse)
encoded, err := codec.EncodeLabelsSeriesQueryResponse(context.Background(), httpRequest, decoded, tc.isSeriesResponse)
require.NoError(t, err)

expectedJSON, err := readResponseBody(httpResponse)
Expand Down Expand Up @@ -554,7 +554,7 @@ func TestPrometheusCodec_JSONEncoding_Labels(t *testing.T) {
Header: http.Header{"Accept": []string{jsonMimeType}},
}

encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, tc.response, tc.isSeriesResponse)
encoded, err := codec.EncodeLabelsSeriesQueryResponse(context.Background(), httpRequest, tc.response, tc.isSeriesResponse)
require.NoError(t, err)
require.Equal(t, http.StatusOK, encoded.StatusCode)
require.Equal(t, "application/json", encoded.Header.Get("Content-Type"))
Expand Down
Loading

0 comments on commit 03192e5

Please sign in to comment.