Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metadata to push payload #9694

Merged
merged 39 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
b799556
Add metadata to push payload
salvacorts Jun 13, 2023
b0810c8
Add metadata to tests
salvacorts Jun 13, 2023
b7ae131
update vendor push pkg
salvacorts Jun 13, 2023
9bec73a
WIP fix tests
salvacorts Jun 13, 2023
d876c21
Fix more tests
salvacorts Jun 13, 2023
9c6fdef
Revent some unwanted changes
salvacorts Jun 13, 2023
b4ed160
Remove more unwanted changes
salvacorts Jun 13, 2023
8c6f22e
Update vendor files
salvacorts Jun 13, 2023
c854d06
use json format for metadata labels in v1 push api
sandeepsukhani Jun 16, 2023
95ba8a2
rename metadataLabels to labels
sandeepsukhani Jun 16, 2023
7db3e54
Merge branch 'main' into salvacorts/metadata-push-payload
sandeepsukhani Jun 16, 2023
ea8f3f6
lint and some test fixes
sandeepsukhani Jun 16, 2023
73cf5dc
lint
sandeepsukhani Jun 16, 2023
7b77c69
lint lint lin...
sandeepsukhani Jun 16, 2023
44b0932
use map object instead of string representation for log labels in jso…
sandeepsukhani Jun 19, 2023
617e864
revert some accidental chagnes
sandeepsukhani Jun 19, 2023
f43dfab
Make labels in loghttp.Entry of type LabelSet
salvacorts Jun 29, 2023
384d556
Apply CR feedback
salvacorts Jun 29, 2023
4088112
Avoid allocations for empty labels
salvacorts Jun 29, 2023
0b0af55
Fix json encoding and add tests
salvacorts Jun 29, 2023
d4e311c
Change error message
salvacorts Jun 29, 2023
10d404e
Add more labels to tests cases
salvacorts Jun 29, 2023
fcd1e66
Fix encode and add more tests
salvacorts Jun 29, 2023
64d3d6c
Do not use struct
salvacorts Jun 30, 2023
96c804d
Merge branch 'main' into salvacorts/metadata-push-payload
sandeepsukhani Jul 5, 2023
7f25bb8
remove a TODO
sandeepsukhani Jul 5, 2023
752ae6b
rename to NonIndexedLabels
salvacorts Jul 10, 2023
4b67145
Add missing protos updates
salvacorts Jul 10, 2023
aa58916
Update vendor
salvacorts Jul 10, 2023
4c60a9e
Update after mod vendor
salvacorts Jul 10, 2023
4028758
Use slice of labels for json and proto structures
salvacorts Jul 11, 2023
8c76150
Fix format
salvacorts Jul 11, 2023
fb505b4
Move LabelsAdapter to push pkg
salvacorts Jul 12, 2023
ccaf192
Use same prometheus version in push pkg
salvacorts Jul 12, 2023
fb42e9c
Merge branch 'main' into salvacorts/metadata-push-payload
salvacorts Jul 12, 2023
eaf235c
Update vendor/github.com/grafana/loki/pkg/push/types.go
salvacorts Jul 12, 2023
7438809
Fix tests
salvacorts Jul 12, 2023
f6e6918
Don't use yolostring when deserializing push request
salvacorts Jul 12, 2023
f8c33aa
Remove yolostring
salvacorts Jul 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
)

Expand Down Expand Up @@ -86,13 +87,21 @@ func New(instanceID, token, baseURL string, opts ...Option) *Client {

// PushLogLine creates a new logline with the current time as timestamp
func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) error {
return c.pushLogLine(line, c.Now, extraLabels...)
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}

func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...)
}

// PushLogLineWithTimestamp creates a new logline at the given timestamp
// The timestamp has to be a Unix timestamp (epoch seconds)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
return c.pushLogLine(line, timestamp, extraLabelList...)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabels ...map[string]string) error {
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}

func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
return c.pushLogLine(line, timestamp, labels.FromMap(logLabels), extraLabelList...)
}

func formatTS(ts time.Time) string {
Expand All @@ -101,21 +110,22 @@ func formatTS(ts time.Time) string {

type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
Values [][]any `json:"values"`
}

// pushLogLine creates a new logline
func (c *Client) pushLogLine(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)

s := stream{
Stream: map[string]string{
"job": "varlog",
},
Values: [][]string{
Values: [][]any{
{
formatTS(timestamp),
line,
logLabels,
},
},
}
Expand Down
5 changes: 4 additions & 1 deletion integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/storage"
)

Expand Down Expand Up @@ -100,18 +99,22 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
{
strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10),
"lineA",
"",
},
{
strconv.FormatInt(now.Add(-48*time.Hour).UnixNano(), 10),
"lineB",
"",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineC",
"",
},
{
strconv.FormatInt(now.Add(-time.Minute).UnixNano(), 10),
"lineD",
"",
},
},
})
Expand Down
10 changes: 5 additions & 5 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,13 @@ func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) {
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
// ingest logs to the previous period
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-36*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))

// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))

})

t.Run("query-lookback-default", func(t *testing.T) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func init() {
type Entry struct {
Timestamp time.Time
Line string
Labels string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this a string feels counterintuitive compared to using map[string]string, which is what we use via the LabelSet struct used in our v1 push payloads in loghttp/push. Point being, I think we should use JSON directly (not json with embedded stringified json fields) when application/json is the Content-Type. When snappy-encoding is used, logproto can use the string variants for consistency with how it treats labels.

Copy link
Contributor Author

@salvacorts salvacorts Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I see with making this a map, and keeping as a string the labels for an entry in logproto, is that converting from loghttp.Entry to logproto.Entry is going to be way slower.

Right now (both with string labels), we just cast the pointer for the entries array of a stream since they have the same memory layout.

entries := *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries))

If we decide to make one a map and another one a string, we won't be able to do that cast anymore (different memory layouts). Hence, we'll need to go entry by entry converting it from loghttp.Entry to logproto.Entry.

IMO, we should make both use the same layout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented this in f43dfab

This is what we need to do instead of the cast:

loki/pkg/loghttp/query.go

Lines 236 to 239 in f43dfab

entries := make([]logproto.Entry, len(s.Entries), len(s.Entries))
for i, e := range s.Entries {
entries[i] = e.ToProto()
}

If we decide to keep this as a string, we can revert the commit. If we decide to make logproto.Entry to use a map for labels, we can build on top of it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to our API docs, we use JSON object for stream labels )https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki
so, it makes sense to use JSON object for line metadata also… but you are right @salvacorts that there is no need to convert it to the string when we convert loghttp.Entry to logproto.Entry ….

Copy link
Member

@owen-d owen-d Jul 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@salvacorts You're right, making this anything but a string here will result in performance losses because this it would need to parse labels potentially on every entry coming back from subqueries. Since this happens before we filter out unnecessary entries (over limit, etc), this can quickly result in higher resource utilization, OOMs, etc.

Now, we have a few options:

  1. Keep this as a string in memory, but turn it into a map when return the json response from the http api. This will maintain performance (no conversion from protos) & correctly return a map via the json api.
  2. Use LabelSet for this field (stored as a map). This requires parsing strings -> json for potentially every log line returned from queriers (yikes!). Even when combined with (4), this would require transforming slice -> map.
  3. Use labels.Labels (stored as a slice). When combined with (4) this incurs no transformation cost and the resulting conversion to the json map for the external API is simple & relatively inexpensive.
  4. Due to the performance implications here, I've changed my opinion on making EntryAdapter.labels a string for parity with StreamAdapter.labels. Instead, I think we should use the LabelAdapter pattern to send the labels in the protos as a slice and combine it with (3) to not need to pay conversion costs.

Does this make sense?

P.S. We'll actually need to do this on the Entry and not EntryAdapter types (we only keep the adapter ones around for bench comparisons, but they're unused). Happy to talk through this more when I've got a bit more time, but we did this primarily for serialization optimizations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vote for 3 and 4. looks like the most optimal way for us. wdyt @salvacorts ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Implemented it in 4028758. We now use labels.Labels in both loghttp.Entry and logproto.Entry.

I wonder if we should take care of duplicated labels. I think may not (?) provided the push payload in json is an object so it should have duplicated keys (is that somehow enforced?).

Note that proto3 supports maps. In the wire, the map is the same as an array so it needs extra logic to have the map API. I ran a benchmark and looks like maps compared to arrays are ~x3 slower to marshal and ~x2 slower to unmarshal (happy to share/discuss the benchmark). Therefore I agree the way to go is using arrays in protobuf.

}

func (e *Entry) UnmarshalJSON(data []byte) error {
Expand Down Expand Up @@ -46,6 +47,13 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
return
}
e.Line = v
case 2: // labels
il, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Labels = il
}
i++
})
Expand All @@ -67,6 +75,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var labels string
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
Expand All @@ -81,6 +90,13 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
return false
}
return true
case 2:
labels = iter.ReadString()
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
return false
Expand All @@ -90,6 +106,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
Labels: labels,
})
return true
}
Expand Down Expand Up @@ -126,6 +143,8 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
stream.WriteMore()
stream.WriteString(e.Labels)
Copy link
Member

@owen-d owen-d Jun 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These types are used in the external API, meaning we'd return stringified labels as a field, which we shouldn't do. We should use json semantics here. Ideally, we should also reflexively be able to encode -> decode, meaning json unmarshaling should expect a json object of non-indexed labels as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with f43dfab

stream.WriteArrayEnd()
}

Expand Down
121 changes: 119 additions & 2 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,122 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error {
})
}

// PushRequest models a log stream push
// PushRequest models a log stream push but is unmarshalled to proto push format.
type PushRequest struct {
Streams []*Stream `json:"streams"`
Streams []LogProtoStream `json:"streams"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously this stored a pointer, now it stores a struct. Not sure we want this (would need to be tested independently for performance)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from this PR:
#1145

The only reason why I think this is a pointer is so we don't make a copy of the stream when calling NewStream which receives a ptr to loghttp.Stream
https://github.com/grafana/loki/pull/1145/files#diff-71619e1c80a73b34eade235a55d012a0ddbb3375b8d4ac89c1f4fd672145b915R34

With Sandeep's changes, we no longer use that since now we decode from json to loghttp.PushRequest to then cast it to logproto.PushRequest

Streams: *(*[]logproto.Stream)(unsafe.Pointer(&request.Streams)),

So having that as a ptr or not shouldn't make much difference.

}

// LogProtoStream helps with unmarshalling of each log stream for push request.
// This might look un-necessary but without it the CPU usage in benchmarks was increasing by ~25% :shrug:
type LogProtoStream struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to implement UnmarshalJSON on logproto.Stream directly -- is that feasible?

Copy link
Contributor Author

@salvacorts salvacorts Jun 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree!

Edit: I think that's not possible right-away. logproto.Stream is an alias to push.Stream provided in the package github.com/grafana/loki/pkg/push which is used in this package.

UnmarshalJSON uses the LabelsSet, so pkg/push would need to include pkg/loki which would end up in a cycling dependency.
https://github.com/grafana/loki/pull/9694/files#diff-5b050fb13a302741e2f4a781fa54987e96da67696ae36ea41aa971ef431bfeccR84

I think this Unmarshalling method make sense to live here since:

  • This pkg is the one needing to populate a logproto.Stream from a json. Moreover, this json has a format for entries (json array) that only this package should know/care about.
  • Workarounding the cycling dependency would require adding more changes than the ones required to implement it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can bring all the parsing code to a single place when we work on the endpoint for supporting native OTLP ingestion.

logproto.Stream
}

func (s *LogProtoStream) UnmarshalJSON(data []byte) error {
err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
switch string(key) {
case "stream":
labels := make(LabelSet)
err := jsonparser.ObjectEach(val, func(key, val []byte, _ jsonparser.ValueType, _ int) error {
labels[yoloString(key)] = yoloString(val)
return nil
})
if err != nil {
return err
}
s.Labels = labels.String()
case "values":
if ty == jsonparser.Null {
return nil
}
entries, err := unmarshalHTTPToLogProtoEntries(val)
if err != nil {
return err
}
s.Entries = entries
}
return nil
})
return err
}

func unmarshalHTTPToLogProtoEntries(data []byte) ([]logproto.Entry, error) {
var (
entries []logproto.Entry
parseError error
)
_, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
if err != nil || parseError != nil {
return
}
if ty == jsonparser.Null {
return
}
e, err := unmarshalHTTPToLogProtoEntry(value)
if err != nil {
parseError = err
return
}
entries = append(entries, e)
})
if parseError != nil {
return nil, parseError
}
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird -- is there a chance err can be non-nil but parseError is nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling of the implementation for jsonparser.ArrayEach doesn't seems correct to me. The error being passed to the callback is weird as it'll always be nil. Then, after calling the callback, the library checks for the value of the error, which will always be nil since it's passed by value to the callback. There's an issue open for this:
buger/jsonparser#255

IIUC, parseError can be nil while err can be non-nil as it can return for example a MalformedJsonError.
Having said that, if err is not nil, we should return err, not parseError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with 384d556

return nil, parseError
}

return entries, nil
}

func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) {
var (
i int
parseError error
e logproto.Entry
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if (i == 0 || i == 1) && t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
} else if i == 2 && t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
switch i {
case 0: // timestamp
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // labels
labels := make(LabelSet)
err := jsonparser.ObjectEach(value, func(key, val []byte, _ jsonparser.ValueType, _ int) error {
labels[yoloString(key)] = yoloString(val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to check types for the values here to ensure they're strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with 384d556

return nil
})
if err != nil {
parseError = err
return
}
e.Labels = labels.String()
}
i++
})
if parseError != nil {
return e, parseError
}
return e, err
}

// ResultType holds the type of the result
Expand Down Expand Up @@ -377,3 +490,7 @@ func labelVolumeLimit(r *http.Request) error {

return nil
}

func yoloString(buf []byte) string {
return *((*string)(unsafe.Pointer(&buf)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the scariest thing in the PR ) we should use it only if we check that the buffer is not updated, however, a lot of functions in stack trace mean that somebody can do some optimization to reuse the slice and it will break all our implementation. Why do we need it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in Slack, removed.

Loading