Skip to content

Commit

Permalink
pkg/promtail/client: Handle fingerprint hash collisions (grafana#1254)
Browse files Browse the repository at this point in the history
* Handle fingerprint hash collisions

Signed-off-by: Peter Štibraný <[email protected]>

* Instead of using fingerprints, use label strings as map keys.

Label strings are properly sorted key=value pairs. This solution
produces more garbage, but doesn't reinvent its own hashing, and keeps
code simple.
This seems to be a good tradeoff for promtail.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored and cyriltovena committed Nov 26, 2019
1 parent c4234e0 commit 125cfbf
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 19 deletions.
8 changes: 4 additions & 4 deletions pkg/logentry/stages/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (ts *timestampStage) Process(labels model.LabelSet, extracted map[string]in
// The timestamp has been correctly parsed, so we should store it in the map
// containing the last known timestamp used by the "fudge" action on failure.
if *ts.cfg.ActionOnFailure == TimestampActionOnFailureFudge {
ts.lastKnownTimestamps.Add(labels.FastFingerprint(), *t)
ts.lastKnownTimestamps.Add(labels.String(), *t)
}
}

Expand Down Expand Up @@ -193,8 +193,8 @@ func (ts *timestampStage) processActionOnFailure(labels model.LabelSet, t *time.
}

func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *time.Time) {
labelsFingerprint := labels.FastFingerprint()
lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsFingerprint)
labelsStr := labels.String()
lastTimestamp, ok := ts.lastKnownTimestamps.Get(labelsStr)

// If the last known timestamp is unknown (ie. has not been successfully parsed yet)
// there's nothing we can do, so we're going to keep the current timestamp
Expand All @@ -206,5 +206,5 @@ func (ts *timestampStage) processActionOnFailureFudge(labels model.LabelSet, t *
*t = lastTimestamp.(time.Time).Add(1 * time.Nanosecond)

// Store the fudged timestamp, so that a subsequent fudged timestamp will be 1ns after it
ts.lastKnownTimestamps.Add(labelsFingerprint, *t)
ts.lastKnownTimestamps.Add(labelsStr, *t)
}
25 changes: 24 additions & 1 deletion pkg/logentry/stages/timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,29 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) {
time.Unix(1, 0),
},
},
"labels with colliding fingerprints should have independent timestamps when fudging": {
config: TimestampConfig{
Source: "time",
Format: time.RFC3339Nano,
ActionOnFailure: lokiutil.StringRef(TimestampActionOnFailureFudge),
},
inputEntries: []inputEntry{
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}, extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.400000000Z"}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}, extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.800000000Z"}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}, extracted: map[string]interface{}{}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}, extracted: map[string]interface{}{}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}, extracted: map[string]interface{}{}},
{timestamp: time.Unix(1, 0), labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}, extracted: map[string]interface{}{}},
},
expectedTimestamps: []time.Time{
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.400000000Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.800000000Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.400000001Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.800000001Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.400000002Z"),
mustParseTime(time.RFC3339Nano, "2019-10-01T01:02:03.800000002Z"),
},
},
}

for testName, testData := range tests {
Expand All @@ -360,7 +383,7 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) {
entry := ""

s.Process(inputEntry.labels, extracted, &timestamp, &entry)
assert.Equal(t, testData.expectedTimestamps[i], timestamp)
assert.Equal(t, testData.expectedTimestamps[i], timestamp, "entry: %d", i)
}
})
}
Expand Down
32 changes: 18 additions & 14 deletions pkg/promtail/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
)

// batch holds pending log streams waiting to be sent to Loki, and it's used
// to reduce the number of push requests to Loki aggregating multiple log streams
// and entries in a single batch request. In case of multi-tenant Promtail, log
// streams for each tenant are stored in a dedicated batch.
type batch struct {
streams map[model.Fingerprint]*logproto.Stream
streams map[string]*logproto.Stream
bytes int
createdAt time.Time
}

func newBatch(entries ...entry) *batch {
b := &batch{
streams: map[model.Fingerprint]*logproto.Stream{},
streams: map[string]*logproto.Stream{},
bytes: 0,
createdAt: time.Now(),
}
Expand All @@ -39,15 +38,15 @@ func (b *batch) add(entry entry) {
b.bytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
fp := entry.labels.FastFingerprint()
if stream, ok := b.streams[fp]; ok {
labels := entry.labels.String()
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry.Entry)
return
}

// Add the entry as a new stream
b.streams[fp] = &logproto.Stream{
Labels: entry.labels.String(),
b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{entry.Entry},
}
}
Expand All @@ -71,6 +70,17 @@ func (b *batch) age() time.Duration {
// encode the batch as snappy-compressed push request, and returns
// the encoded bytes and the number of encoded entries
func (b *batch) encode() ([]byte, int, error) {
req, entriesCount := b.createPushRequest()
buf, err := proto.Marshal(req)
if err != nil {
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, entriesCount, nil
}

// creates push request and returns it, together with number of entries
func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
req := logproto.PushRequest{
Streams: make([]*logproto.Stream, 0, len(b.streams)),
}
Expand All @@ -80,11 +90,5 @@ func (b *batch) encode() ([]byte, int, error) {
req.Streams = append(req.Streams, stream)
entriesCount += len(stream.Entries)
}

buf, err := proto.Marshal(&req)
if err != nil {
return nil, 0, err
}
buf = snappy.Encode(nil, buf)
return buf, entriesCount, nil
return &req, entriesCount
}
33 changes: 33 additions & 0 deletions pkg/promtail/client/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package client

import (
"fmt"
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -103,3 +106,33 @@ func TestBatch_encode(t *testing.T) {
})
}
}

func TestHashCollisions(t *testing.T) {
b := newBatch()

ls1 := model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}
ls2 := model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}

require.False(t, ls1.Equal(ls2))
require.Equal(t, ls1.FastFingerprint(), ls2.FastFingerprint())

const entriesPerLabel = 10

for i := 0; i < entriesPerLabel; i++ {
b.add(entry{labels: ls1, Entry: logproto.Entry{time.Now(), fmt.Sprintf("line %d", i)}})
b.add(entry{labels: ls2, Entry: logproto.Entry{time.Now(), fmt.Sprintf("line %d", i)}})
}

// make sure that colliding labels are stored properly as independent streams
req, entries := b.createPushRequest()
assert.Len(t, req.Streams, 2)
assert.Equal(t, 2*entriesPerLabel, entries)

if req.Streams[0].Labels == ls1.String() {
assert.Equal(t, ls1.String(), req.Streams[0].Labels)
assert.Equal(t, ls2.String(), req.Streams[1].Labels)
} else {
assert.Equal(t, ls2.String(), req.Streams[0].Labels)
assert.Equal(t, ls1.String(), req.Streams[1].Labels)
}
}

0 comments on commit 125cfbf

Please sign in to comment.