diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index d5b35ec6b69f6..287fd6a312129 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -69,15 +69,8 @@ func (s *LogProtoStream) UnmarshalJSON(data []byte) error { err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error { switch string(key) { case "stream": - labels := make(LabelSet) - err := jsonparser.ObjectEach(val, func(key, val []byte, dataType jsonparser.ValueType, _ int) error { - if dataType != jsonparser.String { - return jsonparser.MalformedStringError - } - labels[string(key)] = string(val) - return nil - }) - if err != nil { + var labels LabelSet + if err := labels.UnmarshalJSON(val); err != nil { return err } s.Labels = labels.String() diff --git a/pkg/util/unmarshal/unmarshal_test.go b/pkg/util/unmarshal/unmarshal_test.go index 34e4c2dbf3503..9fdaf27512127 100644 --- a/pkg/util/unmarshal/unmarshal_test.go +++ b/pkg/util/unmarshal/unmarshal_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/loghttp" @@ -15,24 +16,28 @@ import ( "github.com/grafana/loki/pkg/util/marshal" ) -// covers requests to /loki/api/v1/push -var pushTests = []struct { - expected []logproto.Stream - actual string -}{ - { - []logproto.Stream{ - { - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 123456789012345), - Line: "super line", +func Test_DecodePushRequest(t *testing.T) { + // covers requests to /loki/api/v1/push + for _, tc := range []struct { + name string + expected []logproto.Stream + expectedErr bool + actual string + }{ + { + name: "basic", + expected: []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + }, }, + Labels: labels.FromStrings("test", "test").String(), }, - Labels: `{test="test"}`, }, - }, - `{ + actual: `{ "streams": [ { "stream": { @@ -44,24 +49,25 @@ var pushTests = []struct { } ] }`, - }, - { - []logproto.Stream{ - { - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 123456789012345), - Line: "super line", - StructuredMetadata: []logproto.LabelAdapter{ - {Name: "a", Value: "1"}, - {Name: "b", Value: "2"}, + }, + { + name: "with structured metadata", + expected: []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "a", Value: "1"}, + {Name: "b", Value: "2"}, + }, }, }, + Labels: labels.FromStrings("test", "test").String(), }, - Labels: `{test="test"}`, }, - }, - `{ + actual: `{ "streams": [ { "stream": { @@ -73,18 +79,100 @@ var pushTests = []struct { } ] }`, - }, -} + }, -func Test_DecodePushRequest(t *testing.T) { - for i, pushTest := range pushTests { - var actual logproto.PushRequest - closer := io.NopCloser(strings.NewReader(pushTest.actual)) + // The following test cases are added to cover a regression. Even though the Loki HTTP API + // docs for the push endpoint state that the stream label values should be strings, we + // previously didn't enforce this requirement. + // With https://github.com/grafana/loki/pull/9694, we started enforcing this requirement + // and that broke some users. We are adding these test cases to ensure that we don't + // enforce this requirement in the future. Note that we may want to enforce this requirement + // in a future major release, in which case we should modify these test cases. + { + name: "number in stream label value", + expected: []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + }, + }, + Labels: labels.FromStrings("test", "test", "number", "123").String(), + }, + }, + actual: `{ + "streams": [ + { + "stream": { + "test": "test", + "number": 123 + }, + "values":[ + [ "123456789012345", "super line" ] + ] + } + ] + }`, + }, + { + name: "string without quotes in stream label value", + expectedErr: true, + actual: `{ + "streams": [ + { + "stream": { + "test": "test", + "text": None + }, + "values":[ + [ "123456789012345", "super line" ] + ] + } + ] + }`, + }, + { + name: "json object in stream label value", + expected: []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + }, + }, + Labels: labels.FromStrings("test", "test", "text", "{ \"a\": \"b\" }").String(), + }, + }, + actual: `{ + "streams": [ + { + "stream": { + "test": "test", + "text": { "a": "b" } + }, + "values":[ + [ "123456789012345", "super line" ] + ] + } + ] + }`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var actual logproto.PushRequest + closer := io.NopCloser(strings.NewReader(tc.actual)) - err := DecodePushRequest(closer, &actual) - require.NoError(t, err) + err := DecodePushRequest(closer, &actual) + if tc.expectedErr { + require.Error(t, err) + return + } + require.NoError(t, err) - require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i) + require.Equal(t, tc.expected, actual.Streams) + }) } }