From a407b30dec6c219f3fe4c7254225aad8e7ef4bc0 Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Mon, 11 Apr 2022 11:36:11 +0200 Subject: [PATCH] matrix merger implementation --- pkg/handler/loki.go | 30 ++++- pkg/loki/matrix_merger.go | 63 ++++++++++ pkg/loki/matrix_merger_test.go | 112 ++++++++++++++++++ pkg/loki/{merger.go => streams_merger.go} | 6 +- ...{merger_test.go => streams_merger_test.go} | 12 +- 5 files changed, 208 insertions(+), 15 deletions(-) create mode 100644 pkg/loki/matrix_merger.go create mode 100644 pkg/loki/matrix_merger_test.go rename pkg/loki/{merger.go => streams_merger.go} (93%) rename pkg/loki/{merger_test.go => streams_merger_test.go} (89%) diff --git a/pkg/handler/loki.go b/pkg/handler/loki.go index 6cb8ce066..ccdde38db 100644 --- a/pkg/handler/loki.go +++ b/pkg/handler/loki.go @@ -104,14 +104,25 @@ func fetchParallel(lokiClient httpclient.HTTPClient, queries []string) ([]byte, // Aggregate results first := true var resp model.QueryResponse - merger := loki.NewStreamMerger() + var streamMerger *loki.StreamMerger + var matrixMerger *loki.MatrixMerger for r := range resChan { + //TODO: resp.Data.Stats are incorrect doing that + if first { + first = false + resp = r + } + if streams, ok := r.Data.Result.(model.Streams); ok { - merger.Add(streams) - if first { - first = false - resp = r + if streamMerger == nil { + streamMerger = loki.NewStreamMerger() + } + streamMerger.AddStreams(streams) + } else if matrix, ok := r.Data.Result.(model.Matrix); ok { + if matrixMerger == nil { + matrixMerger = loki.NewMatrixMerger() } + matrixMerger.AddMatrix(matrix) } else { return nil, http.StatusInternalServerError, fmt.Errorf("loki returned an unexpected type: %T", r.Data.Result) } @@ -122,7 +133,14 @@ func fetchParallel(lokiClient httpclient.HTTPClient, queries []string) ([]byte, } // Encode back to json - resp.Data.Result = merger.GetStreams() + if streamMerger != nil { + resp.Data.Result = streamMerger.GetStreams() + } else if matrixMerger != nil { + resp.Data.Result = matrixMerger.GetStreams() + } else { + return nil, http.StatusInternalServerError, fmt.Errorf("cannot merge result. Data should either be stream or matrix") + } + encoded, err := json.Marshal(resp) if err != nil { return nil, http.StatusInternalServerError, err diff --git a/pkg/loki/matrix_merger.go b/pkg/loki/matrix_merger.go new file mode 100644 index 000000000..f2d93f815 --- /dev/null +++ b/pkg/loki/matrix_merger.go @@ -0,0 +1,63 @@ +package loki + +import ( + "github.com/netobserv/network-observability-console-plugin/pkg/model" + pmodel "github.com/prometheus/common/model" +) + +type MatrixMerger struct { + index map[string]indexedSampleStream + merged model.Matrix +} + +func NewMatrixMerger() *MatrixMerger { + return &MatrixMerger{ + index: map[string]indexedSampleStream{}, + merged: model.Matrix{}, + } +} + +type indexedSampleStream struct { + sampleStream pmodel.SampleStream + values map[string]interface{} + index int +} + +func (m *MatrixMerger) AddMatrix(from model.Matrix) model.Matrix { + for _, sampleStream := range from { + skey := sampleStream.Metric.String() + idxSampleStream, sampleStreamExists := m.index[skey] + if !sampleStreamExists { + // SampleStream doesn't exist => create new index + idxSampleStream = indexedSampleStream{ + sampleStream: sampleStream, + values: map[string]interface{}{}, + index: len(m.index), + } + } + // Merge content (values) + for _, v := range sampleStream.Values { + vkey := v.String() + if _, valueExists := idxSampleStream.values[vkey]; !valueExists { + // Add value to the existing sampleStream, and mark it as existing in idxSampleStream.values + idxSampleStream.values[vkey] = nil + if sampleStreamExists { + idxSampleStream.sampleStream.Values = append(m.index[skey].sampleStream.Values, v) + } + } // Else: entry found => ignore duplicate + } + // Add or overwrite index + m.index[skey] = idxSampleStream + if !sampleStreamExists { + // SampleStream doesn't exist => append it + m.merged = append(m.merged, idxSampleStream.sampleStream) + } else { + m.merged[idxSampleStream.index] = idxSampleStream.sampleStream + } + } + return m.merged +} + +func (m *MatrixMerger) GetStreams() model.Matrix { + return m.merged +} diff --git a/pkg/loki/matrix_merger_test.go b/pkg/loki/matrix_merger_test.go new file mode 100644 index 000000000..f81c62b58 --- /dev/null +++ b/pkg/loki/matrix_merger_test.go @@ -0,0 +1,112 @@ +package loki + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/netobserv/network-observability-console-plugin/pkg/model" + pmodel "github.com/prometheus/common/model" +) + +func TestMatrixMerge(t *testing.T) { + now := pmodel.Now() + merger := NewMatrixMerger() + baseline := pmodel.SampleStream{ + Metric: pmodel.Metric{ + "foo": "bar", + }, + Values: []pmodel.SamplePair{{ + Timestamp: now, + Value: pmodel.SampleValue(42), + }}, + } + merger.AddMatrix(model.Matrix{baseline}) + + // Different metric, different value pair => no dedup + merged := merger.AddMatrix(model.Matrix{{ + Metric: pmodel.Metric{ + "foo": "bar", + "foo2": "bar2", + }, + Values: baseline.Values, + }, { + Metric: baseline.Metric, + Values: []pmodel.SamplePair{{ + Timestamp: now, + Value: pmodel.SampleValue(12), + }}, + }}) + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Values, 2) + assert.Len(t, merged[1].Values, 1) + + // Same metrics in different order => no dedup + merged = merger.AddMatrix(model.Matrix{{ + Metric: pmodel.Metric{ + "foo2": "bar2", + "foo": "bar", + }, + Values: baseline.Values, + }, { + Metric: pmodel.Metric{ + "foo2": "bar2", + "foo": "bar", + }, + Values: baseline.Values, + }, { + Metric: pmodel.Metric{ + "foo": "bar", + "foo2": "bar2", + }, + Values: baseline.Values, + }}) + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Values, 2) + assert.Len(t, merged[1].Values, 1) + + // Different timestamp => no dedup + merged = merger.AddMatrix(model.Matrix{{ + Metric: baseline.Metric, + Values: []pmodel.SamplePair{{ + Timestamp: now.Add(time.Hour), + Value: pmodel.SampleValue(12), + }}, + }}) + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Values, 3) + assert.Len(t, merged[1].Values, 1) + + // some dedup + merged = merger.AddMatrix(model.Matrix{{ + // changed value => no dedup + Metric: baseline.Metric, + Values: []pmodel.SamplePair{{ + Timestamp: now, + Value: pmodel.SampleValue(8), + }}, + }, { + // changed value => no dedup + Metric: baseline.Metric, + Values: []pmodel.SamplePair{{ + Timestamp: now, + Value: pmodel.SampleValue(0), + }}, + }, { + // same as previously modified timestamp => will be added + Metric: baseline.Metric, + Values: []pmodel.SamplePair{{ + Timestamp: now.Add(time.Hour), + Value: pmodel.SampleValue(42), + }}, + }, + // baseline itself => must be ignored + baseline, + }) + + // Different timestamp + assert.Len(t, merged, 2) + assert.Len(t, merged[0].Values, 6) + assert.Len(t, merged[1].Values, 1) +} diff --git a/pkg/loki/merger.go b/pkg/loki/streams_merger.go similarity index 93% rename from pkg/loki/merger.go rename to pkg/loki/streams_merger.go index c4eeb2f36..2c97d0792 100644 --- a/pkg/loki/merger.go +++ b/pkg/loki/streams_merger.go @@ -12,8 +12,8 @@ type StreamMerger struct { merged model.Streams } -func NewStreamMerger() StreamMerger { - return StreamMerger{ +func NewStreamMerger() *StreamMerger { + return &StreamMerger{ index: map[string]indexedStream{}, merged: model.Streams{}, } @@ -45,7 +45,7 @@ func uniqueEntry(e *model.Entry) string { return e.Timestamp.String() + e.Line } -func (m *StreamMerger) Add(from model.Streams) model.Streams { +func (m *StreamMerger) AddStreams(from model.Streams) model.Streams { for _, stream := range from { lkey := uniqueStream(&stream) idxStream, streamExists := m.index[lkey] diff --git a/pkg/loki/merger_test.go b/pkg/loki/streams_merger_test.go similarity index 89% rename from pkg/loki/merger_test.go rename to pkg/loki/streams_merger_test.go index 6150c27f0..5c87c7602 100644 --- a/pkg/loki/merger_test.go +++ b/pkg/loki/streams_merger_test.go @@ -9,7 +9,7 @@ import ( "github.com/netobserv/network-observability-console-plugin/pkg/model" ) -func TestMerge(t *testing.T) { +func TestStreamsMerge(t *testing.T) { now := time.Now() merger := NewStreamMerger() baseline := model.Stream{ @@ -21,10 +21,10 @@ func TestMerge(t *testing.T) { Line: "{key: value}", }}, } - merger.Add(model.Streams{baseline}) + merger.AddStreams(model.Streams{baseline}) // Different label, different line => no dedup - merged := merger.Add(model.Streams{{ + merged := merger.AddStreams(model.Streams{{ Labels: map[string]string{ "foo": "bar", "foo2": "bar2", @@ -42,7 +42,7 @@ func TestMerge(t *testing.T) { assert.Len(t, merged[1].Entries, 1) // Same labels in different order => no dedup - merged = merger.Add(model.Streams{{ + merged = merger.AddStreams(model.Streams{{ Labels: map[string]string{ "foo2": "bar2", "foo": "bar", @@ -66,7 +66,7 @@ func TestMerge(t *testing.T) { assert.Len(t, merged[1].Entries, 1) // Different timestamp => no dedup - merged = merger.Add(model.Streams{{ + merged = merger.AddStreams(model.Streams{{ Labels: baseline.Labels, Entries: []model.Entry{{ Timestamp: now.Add(time.Hour), @@ -78,7 +78,7 @@ func TestMerge(t *testing.T) { assert.Len(t, merged[1].Entries, 1) // some dedup - merged = merger.Add(model.Streams{{ + merged = merger.AddStreams(model.Streams{{ // changed line => no dedup Labels: baseline.Labels, Entries: []model.Entry{{