Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify time parsing logic in prometheus codec; clarify labels-series compatibility #10117

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ require (
github.com/dlclark/regexp2 v1.11.0 // indirect
github.com/docker/go-connections v0.4.1-0.20210727194412-58542c764a11 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 // indirect
github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4
github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a // indirect
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb // indirect
github.com/fatih/color v1.16.0 // indirect
Expand Down
191 changes: 112 additions & 79 deletions pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"sort"
Expand Down Expand Up @@ -78,24 +77,24 @@ type Codec interface {
Merger
// DecodeMetricsQueryRequest decodes a MetricsQueryRequest from an http request.
DecodeMetricsQueryRequest(context.Context, *http.Request) (MetricsQueryRequest, error)
// DecodeLabelsQueryRequest decodes a LabelsQueryRequest from an http request.
DecodeLabelsQueryRequest(context.Context, *http.Request) (LabelsQueryRequest, error)
// DecodeLabelsSeriesQueryRequest decodes a LabelsSeriesQueryRequest from an http request.
DecodeLabelsSeriesQueryRequest(context.Context, *http.Request) (LabelsSeriesQueryRequest, error)
// DecodeMetricsQueryResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeMetricsQueryResponse(context.Context, *http.Response, MetricsQueryRequest, log.Logger) (Response, error)
// DecodeLabelsQueryResponse decodes a Response from an http response.
// DecodeLabelsSeriesQueryResponse decodes a Response from an http response.
// The original request is also passed as a parameter this is useful for implementation that needs the request
// to merge result or build the result correctly.
DecodeLabelsQueryResponse(context.Context, *http.Response, LabelsQueryRequest, log.Logger) (Response, error)
DecodeLabelsSeriesQueryResponse(context.Context, *http.Response, LabelsSeriesQueryRequest, log.Logger) (Response, error)
// EncodeMetricsQueryRequest encodes a MetricsQueryRequest into an http request.
EncodeMetricsQueryRequest(context.Context, MetricsQueryRequest) (*http.Request, error)
// EncodeLabelsQueryRequest encodes a LabelsQueryRequest into an http request.
EncodeLabelsQueryRequest(context.Context, LabelsQueryRequest) (*http.Request, error)
// EncodeLabelsSeriesQueryRequest encodes a LabelsSeriesQueryRequest into an http request.
EncodeLabelsSeriesQueryRequest(context.Context, LabelsSeriesQueryRequest) (*http.Request, error)
// EncodeMetricsQueryResponse encodes a Response from a MetricsQueryRequest into an http response.
EncodeMetricsQueryResponse(context.Context, *http.Request, Response) (*http.Response, error)
// EncodeLabelsQueryResponse encodes a Response from a LabelsQueryRequest into an http response.
EncodeLabelsQueryResponse(context.Context, *http.Request, Response, bool) (*http.Response, error)
// EncodeLabelsSeriesQueryResponse encodes a Response from a LabelsSeriesQueryRequest into an http response.
EncodeLabelsSeriesQueryResponse(context.Context, *http.Request, Response, bool) (*http.Response, error)
}

// Merger is used by middlewares making multiple requests to merge back all responses into a single one.
Expand Down Expand Up @@ -153,8 +152,8 @@ type MetricsQueryRequest interface {
AddSpanTags(opentracing.Span)
}

// LabelsQueryRequest represents a label names or values query request that can be process by middlewares.
type LabelsQueryRequest interface {
// LabelsSeriesQueryRequest represents a label names, label values, or series query request that can be process by middlewares.
type LabelsSeriesQueryRequest interface {
// GetLabelName returns the label name param from a Label Values request `/api/v1/label/<label_name>/values`
// or an empty string for a Label Names request `/api/v1/labels`
GetLabelName() string
Expand All @@ -178,11 +177,11 @@ type LabelsQueryRequest interface {
// GetHeaders returns the HTTP headers in the request.
GetHeaders() []*PrometheusHeader
// WithLabelName clones the current request with a different label name param.
WithLabelName(string) (LabelsQueryRequest, error)
WithLabelName(string) (LabelsSeriesQueryRequest, error)
// WithLabelMatcherSets clones the current request with different label matchers.
WithLabelMatcherSets([]string) (LabelsQueryRequest, error)
WithLabelMatcherSets([]string) (LabelsSeriesQueryRequest, error)
// WithHeaders clones the current request with different headers.
WithHeaders([]*PrometheusHeader) (LabelsQueryRequest, error)
WithHeaders([]*PrometheusHeader) (LabelsSeriesQueryRequest, error)
// AddSpanTags writes information about this request to an OpenTracing span
AddSpanTags(opentracing.Span)
}
Expand Down Expand Up @@ -356,7 +355,7 @@ func (c prometheusCodec) decodeInstantQueryRequest(r *http.Request) (MetricsQuer
return nil, apierror.New(apierror.TypeBadData, err.Error())
}

time, err := DecodeInstantQueryTimeParams(&reqValues, time.Now)
time, err := DecodeInstantQueryTimeParams(&reqValues)
if err != nil {
return nil, DecorateWithParamName(err, "time")
}
Expand Down Expand Up @@ -388,16 +387,18 @@ func httpHeadersToProm(httpH http.Header) []*PrometheusHeader {
return headers
}

func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Request) (LabelsQueryRequest, error) {
func (prometheusCodec) DecodeLabelsSeriesQueryRequest(_ context.Context, r *http.Request) (LabelsSeriesQueryRequest, error) {
if !IsLabelsQuery(r.URL.Path) && !IsSeriesQuery(r.URL.Path) {
return nil, fmt.Errorf("unknown labels query API endpoint %s", r.URL.Path)
return nil, fmt.Errorf("unknown labels or series query API endpoint %s", r.URL.Path)
}

reqValues, err := util.ParseRequestFormWithoutConsumingBody(r)
if err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
}
start, end, err := DecodeLabelsQueryTimeParams(&reqValues, false)
// see DecodeLabelsSeriesQueryTimeParams for notes on time param parsing compatibility
// between label names, label values, and series requests
start, end, err := DecodeLabelsSeriesQueryTimeParams(&reqValues)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -445,26 +446,80 @@ func (prometheusCodec) DecodeLabelsQueryRequest(_ context.Context, r *http.Reque
}, nil
}

// TimeParamType enumerates the types of time parameters in Prometheus API.
// https://prometheus.io/docs/prometheus/latest/querying/api/
type TimeParamType int

const (
// RFC3339OrUnixMS represents the <rfc3339 | unix_timestamp> type in Prometheus Querying API docs
RFC3339OrUnixMS TimeParamType = iota
// DurationMS represents the <duration> type in Prometheus Querying API docs
DurationMS
// DurationMSOrFloatMS represents the <duration | float> in Prometheus Querying API docs
DurationMSOrFloatMS
)

// PromTimeParamDecoder provides common functionality for decoding Prometheus time parameters.
type PromTimeParamDecoder struct {
paramName string
timeType TimeParamType
isOptional bool
defaultMSFunc func() int64
}

func (p PromTimeParamDecoder) Decode(reqValues *url.Values) (int64, error) {
rawValue := reqValues.Get(p.paramName)
if rawValue == "" {
if p.isOptional {
if p.defaultMSFunc != nil {
return p.defaultMSFunc(), nil
}
return 0, nil
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
}
return 0, apierror.New(apierror.TypeBadData, fmt.Sprintf("missing required parameter %q", p.timeType))
}

var t int64
var err error
switch p.timeType {
case RFC3339OrUnixMS:
t, err = util.ParseTime(rawValue)
case DurationMS, DurationMSOrFloatMS:
t, err = util.ParseDurationMS(rawValue)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
default:
return 0, apierror.New(apierror.TypeInternal, fmt.Sprintf("unknown time type %v", p.timeType))
}
if err != nil {
return 0, DecorateWithParamName(err, p.paramName)
}

return t, nil
}

var rangeStartParamDecodable = PromTimeParamDecoder{"start", RFC3339OrUnixMS, false, nil}
var rangeEndParamDecodable = PromTimeParamDecoder{"end", RFC3339OrUnixMS, false, nil}
var rangeStepEndParamDecodable = PromTimeParamDecoder{"step", DurationMSOrFloatMS, false, nil}

// DecodeRangeQueryTimeParams encapsulates Prometheus instant query time param parsing,
// emulating the logic in prometheus/prometheus/web/api/v1#API.query_range.
func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64, err error) {
start, err = util.ParseTime(reqValues.Get("start"))
start, err = rangeStartParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, 0, DecorateWithParamName(err, "start")
return 0, 0, 0, err
}

end, err = util.ParseTime(reqValues.Get("end"))
end, err = rangeEndParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, 0, DecorateWithParamName(err, "end")
return 0, 0, 0, err
}

if end < start {
return 0, 0, 0, errEndBeforeStart
}

step, err = parseDurationMs(reqValues.Get("step"))
step, err = rangeStepEndParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, 0, DecorateWithParamName(err, "step")
return 0, 0, 0, err
}

if step <= 0 {
Expand All @@ -480,55 +535,47 @@ func DecodeRangeQueryTimeParams(reqValues *url.Values) (start, end, step int64,
return start, end, step, nil
}

func instantTimeParamNow() int64 {
return time.Now().UTC().UnixMilli()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the UTC() necessary? I thought UnixMilli is only defined in UTC

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah true. I am just in the habit of making sure anything with time gets UTC-ed

}

var instantTimeParamDecodable = PromTimeParamDecoder{"time", RFC3339OrUnixMS, true, instantTimeParamNow}

// DecodeInstantQueryTimeParams encapsulates Prometheus instant query time param parsing,
// emulating the logic in prometheus/prometheus/web/api/v1#API.query.
func DecodeInstantQueryTimeParams(reqValues *url.Values, defaultNow func() time.Time) (time int64, err error) {
timeVal := reqValues.Get("time")
if timeVal == "" {
time = defaultNow().UnixMilli()
} else {
time, err = util.ParseTime(timeVal)
if err != nil {
return 0, DecorateWithParamName(err, "time")
}
func DecodeInstantQueryTimeParams(reqValues *url.Values) (time int64, err error) {
time, err = instantTimeParamDecodable.Decode(reqValues)
if err != nil {
return 0, err
}

return time, err
}

// DecodeLabelsQueryTimeParams encapsulates Prometheus label names and label values query time param parsing,
// emulating the logic in prometheus/prometheus/web/api/v1#API.labelNames and v1#API.labelValues.
//
// Setting `usePromDefaults` true will set missing timestamp params to the Prometheus default
// min and max query timestamps; false will default to 0 for missing timestamp params.
func DecodeLabelsQueryTimeParams(reqValues *url.Values, usePromDefaults bool) (start, end int64, err error) {
var defaultStart, defaultEnd int64
if usePromDefaults {
defaultStart = v1.MinTime.UnixMilli()
defaultEnd = v1.MaxTime.UnixMilli()
}

startVal := reqValues.Get("start")
if startVal == "" {
start = defaultStart
} else {
start, err = util.ParseTime(startVal)
if err != nil {
return 0, 0, DecorateWithParamName(err, "start")
}
// Label names, label values, and series codec applies the prometheus/web/api/v1.MinTime and MaxTime defaults on read
// with GetStartOrDefault/GetEndOrDefault, so we don't need to apply them with a defaultMSFunc here.
// This allows the object to be symmetrically decoded and encoded to and from the http request format,
// as well as indicating when an optional time parameter was not included in the original request.
var labelsStartParamDecodable = PromTimeParamDecoder{"start", RFC3339OrUnixMS, true, nil}
var labelsEndParamDecodable = PromTimeParamDecoder{"end", RFC3339OrUnixMS, true, nil}

// DecodeLabelsSeriesQueryTimeParams encapsulates Prometheus query time param parsing
// for label names, label values, and series endpoints, emulating prometheus/prometheus/web/api/v1.
// Note: the Prometheus HTTP API spec claims that the series endpoint `start` and `end` parameters
// are not optional, but the Prometheus implementation allows them to be optional.
// Until this changes we can reuse the same PromTimeParamDecoder structs as the label names and values endpoints.
func DecodeLabelsSeriesQueryTimeParams(reqValues *url.Values) (start, end int64, err error) {
start, err = labelsStartParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, err
}

endVal := reqValues.Get("end")
if endVal == "" {
end = defaultEnd
} else {
end, err = util.ParseTime(endVal)
if err != nil {
return 0, 0, DecorateWithParamName(err, "end")
}
end, err = labelsEndParamDecodable.Decode(reqValues)
if err != nil {
return 0, 0, err
}

if endVal != "" && end < start {
if end != 0 && end < start {
return 0, 0, errEndBeforeStart
}

Expand Down Expand Up @@ -649,7 +696,7 @@ func (c prometheusCodec) EncodeMetricsQueryRequest(ctx context.Context, r Metric
return req.WithContext(ctx), nil
}

func (c prometheusCodec) EncodeLabelsQueryRequest(ctx context.Context, req LabelsQueryRequest) (*http.Request, error) {
func (c prometheusCodec) EncodeLabelsSeriesQueryRequest(ctx context.Context, req LabelsSeriesQueryRequest) (*http.Request, error) {
var u *url.URL
switch req := req.(type) {
case *PrometheusLabelNamesQueryRequest:
Expand Down Expand Up @@ -823,7 +870,7 @@ func (c prometheusCodec) DecodeMetricsQueryResponse(ctx context.Context, r *http
return resp, nil
}

func (c prometheusCodec) DecodeLabelsQueryResponse(ctx context.Context, r *http.Response, lr LabelsQueryRequest, logger log.Logger) (Response, error) {
func (c prometheusCodec) DecodeLabelsSeriesQueryResponse(ctx context.Context, r *http.Response, lr LabelsSeriesQueryRequest, logger log.Logger) (Response, error) {
spanlog := spanlogger.FromContext(ctx, logger)
buf, err := readResponseBody(r)
if err != nil {
Expand Down Expand Up @@ -959,7 +1006,7 @@ func (c prometheusCodec) EncodeMetricsQueryResponse(ctx context.Context, req *ht
return &resp, nil
}

func (c prometheusCodec) EncodeLabelsQueryResponse(ctx context.Context, req *http.Request, res Response, isSeriesResponse bool) (*http.Response, error) {
func (c prometheusCodec) EncodeLabelsSeriesQueryResponse(ctx context.Context, req *http.Request, res Response, isSeriesResponse bool) (*http.Response, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
defer sp.Finish()

Expand Down Expand Up @@ -1156,20 +1203,6 @@ func readResponseBody(res *http.Response) ([]byte, error) {
return buf.Bytes(), nil
}

func parseDurationMs(s string) (int64, error) {
if d, err := strconv.ParseFloat(s, 64); err == nil {
ts := d * float64(time.Second/time.Millisecond)
if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) {
return 0, apierror.Newf(apierror.TypeBadData, "cannot parse %q to a valid duration. It overflows int64", s)
}
return int64(ts), nil
}
if d, err := model.ParseDuration(s); err == nil {
return int64(d) / int64(time.Millisecond/time.Nanosecond), nil
}
return 0, apierror.Newf(apierror.TypeBadData, "cannot parse %q to a valid duration", s)
}

func encodeTime(t int64) string {
f := float64(t) / 1.0e3
return strconv.FormatFloat(f, 'f', -1, 64)
Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/querymiddleware/codec_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {

for _, tc := range []struct {
name string
request LabelsQueryRequest
request LabelsSeriesQueryRequest
isSeriesResponse bool
responseHeaders http.Header
resp prometheusAPIResponse
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
decoded, err := codec.DecodeLabelsQueryResponse(context.Background(), httpResponse, tc.request, log.NewNopLogger())
decoded, err := codec.DecodeLabelsSeriesQueryResponse(context.Background(), httpResponse, tc.request, log.NewNopLogger())
if err != nil || tc.expectedErr != nil {
require.Equal(t, tc.expectedErr, err)
return
Expand All @@ -328,7 +328,7 @@ func TestPrometheusCodec_JSONResponse_Labels(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer(body)),
ContentLength: int64(len(body)),
}
encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, decoded, tc.isSeriesResponse)
encoded, err := codec.EncodeLabelsSeriesQueryResponse(context.Background(), httpRequest, decoded, tc.isSeriesResponse)
require.NoError(t, err)

expectedJSON, err := readResponseBody(httpResponse)
Expand Down Expand Up @@ -554,7 +554,7 @@ func TestPrometheusCodec_JSONEncoding_Labels(t *testing.T) {
Header: http.Header{"Accept": []string{jsonMimeType}},
}

encoded, err := codec.EncodeLabelsQueryResponse(context.Background(), httpRequest, tc.response, tc.isSeriesResponse)
encoded, err := codec.EncodeLabelsSeriesQueryResponse(context.Background(), httpRequest, tc.response, tc.isSeriesResponse)
require.NoError(t, err)
require.Equal(t, http.StatusOK, encoded.StatusCode)
require.Equal(t, "application/json", encoded.Header.Get("Content-Type"))
Expand Down
Loading
Loading