Skip to content

Commit

Permalink
matrix merger implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Apr 11, 2022
1 parent daccc0b commit a407b30
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 15 deletions.
30 changes: 24 additions & 6 deletions pkg/handler/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,25 @@ func fetchParallel(lokiClient httpclient.HTTPClient, queries []string) ([]byte,
// Aggregate results
first := true
var resp model.QueryResponse
merger := loki.NewStreamMerger()
var streamMerger *loki.StreamMerger
var matrixMerger *loki.MatrixMerger
for r := range resChan {
//TODO: resp.Data.Stats are incorrect doing that
if first {
first = false
resp = r
}

if streams, ok := r.Data.Result.(model.Streams); ok {
merger.Add(streams)
if first {
first = false
resp = r
if streamMerger == nil {
streamMerger = loki.NewStreamMerger()
}
streamMerger.AddStreams(streams)
} else if matrix, ok := r.Data.Result.(model.Matrix); ok {
if matrixMerger == nil {
matrixMerger = loki.NewMatrixMerger()
}
matrixMerger.AddMatrix(matrix)
} else {
return nil, http.StatusInternalServerError, fmt.Errorf("loki returned an unexpected type: %T", r.Data.Result)
}
Expand All @@ -122,7 +133,14 @@ func fetchParallel(lokiClient httpclient.HTTPClient, queries []string) ([]byte,
}

// Encode back to json
resp.Data.Result = merger.GetStreams()
if streamMerger != nil {
resp.Data.Result = streamMerger.GetStreams()
} else if matrixMerger != nil {
resp.Data.Result = matrixMerger.GetStreams()
} else {
return nil, http.StatusInternalServerError, fmt.Errorf("cannot merge result. Data should either be stream or matrix")
}

encoded, err := json.Marshal(resp)
if err != nil {
return nil, http.StatusInternalServerError, err
Expand Down
63 changes: 63 additions & 0 deletions pkg/loki/matrix_merger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package loki

import (
"github.com/netobserv/network-observability-console-plugin/pkg/model"
pmodel "github.com/prometheus/common/model"
)

type MatrixMerger struct {
index map[string]indexedSampleStream
merged model.Matrix
}

func NewMatrixMerger() *MatrixMerger {
return &MatrixMerger{
index: map[string]indexedSampleStream{},
merged: model.Matrix{},
}
}

type indexedSampleStream struct {
sampleStream pmodel.SampleStream
values map[string]interface{}
index int
}

func (m *MatrixMerger) AddMatrix(from model.Matrix) model.Matrix {
for _, sampleStream := range from {
skey := sampleStream.Metric.String()
idxSampleStream, sampleStreamExists := m.index[skey]
if !sampleStreamExists {
// SampleStream doesn't exist => create new index
idxSampleStream = indexedSampleStream{
sampleStream: sampleStream,
values: map[string]interface{}{},
index: len(m.index),
}
}
// Merge content (values)
for _, v := range sampleStream.Values {
vkey := v.String()
if _, valueExists := idxSampleStream.values[vkey]; !valueExists {
// Add value to the existing sampleStream, and mark it as existing in idxSampleStream.values
idxSampleStream.values[vkey] = nil
if sampleStreamExists {
idxSampleStream.sampleStream.Values = append(m.index[skey].sampleStream.Values, v)
}
} // Else: entry found => ignore duplicate
}
// Add or overwrite index
m.index[skey] = idxSampleStream
if !sampleStreamExists {
// SampleStream doesn't exist => append it
m.merged = append(m.merged, idxSampleStream.sampleStream)
} else {
m.merged[idxSampleStream.index] = idxSampleStream.sampleStream
}
}
return m.merged
}

func (m *MatrixMerger) GetStreams() model.Matrix {
return m.merged
}
112 changes: 112 additions & 0 deletions pkg/loki/matrix_merger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package loki

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/netobserv/network-observability-console-plugin/pkg/model"
pmodel "github.com/prometheus/common/model"
)

func TestMatrixMerge(t *testing.T) {
now := pmodel.Now()
merger := NewMatrixMerger()
baseline := pmodel.SampleStream{
Metric: pmodel.Metric{
"foo": "bar",
},
Values: []pmodel.SamplePair{{
Timestamp: now,
Value: pmodel.SampleValue(42),
}},
}
merger.AddMatrix(model.Matrix{baseline})

// Different metric, different value pair => no dedup
merged := merger.AddMatrix(model.Matrix{{
Metric: pmodel.Metric{
"foo": "bar",
"foo2": "bar2",
},
Values: baseline.Values,
}, {
Metric: baseline.Metric,
Values: []pmodel.SamplePair{{
Timestamp: now,
Value: pmodel.SampleValue(12),
}},
}})
assert.Len(t, merged, 2)
assert.Len(t, merged[0].Values, 2)
assert.Len(t, merged[1].Values, 1)

// Same metrics in different order => no dedup
merged = merger.AddMatrix(model.Matrix{{
Metric: pmodel.Metric{
"foo2": "bar2",
"foo": "bar",
},
Values: baseline.Values,
}, {
Metric: pmodel.Metric{
"foo2": "bar2",
"foo": "bar",
},
Values: baseline.Values,
}, {
Metric: pmodel.Metric{
"foo": "bar",
"foo2": "bar2",
},
Values: baseline.Values,
}})
assert.Len(t, merged, 2)
assert.Len(t, merged[0].Values, 2)
assert.Len(t, merged[1].Values, 1)

// Different timestamp => no dedup
merged = merger.AddMatrix(model.Matrix{{
Metric: baseline.Metric,
Values: []pmodel.SamplePair{{
Timestamp: now.Add(time.Hour),
Value: pmodel.SampleValue(12),
}},
}})
assert.Len(t, merged, 2)
assert.Len(t, merged[0].Values, 3)
assert.Len(t, merged[1].Values, 1)

// some dedup
merged = merger.AddMatrix(model.Matrix{{
// changed value => no dedup
Metric: baseline.Metric,
Values: []pmodel.SamplePair{{
Timestamp: now,
Value: pmodel.SampleValue(8),
}},
}, {
// changed value => no dedup
Metric: baseline.Metric,
Values: []pmodel.SamplePair{{
Timestamp: now,
Value: pmodel.SampleValue(0),
}},
}, {
// same as previously modified timestamp => will be added
Metric: baseline.Metric,
Values: []pmodel.SamplePair{{
Timestamp: now.Add(time.Hour),
Value: pmodel.SampleValue(42),
}},
},
// baseline itself => must be ignored
baseline,
})

// Different timestamp
assert.Len(t, merged, 2)
assert.Len(t, merged[0].Values, 6)
assert.Len(t, merged[1].Values, 1)
}
6 changes: 3 additions & 3 deletions pkg/loki/merger.go → pkg/loki/streams_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type StreamMerger struct {
merged model.Streams
}

func NewStreamMerger() StreamMerger {
return StreamMerger{
func NewStreamMerger() *StreamMerger {
return &StreamMerger{
index: map[string]indexedStream{},
merged: model.Streams{},
}
Expand Down Expand Up @@ -45,7 +45,7 @@ func uniqueEntry(e *model.Entry) string {
return e.Timestamp.String() + e.Line
}

func (m *StreamMerger) Add(from model.Streams) model.Streams {
func (m *StreamMerger) AddStreams(from model.Streams) model.Streams {
for _, stream := range from {
lkey := uniqueStream(&stream)
idxStream, streamExists := m.index[lkey]
Expand Down
12 changes: 6 additions & 6 deletions pkg/loki/merger_test.go → pkg/loki/streams_merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/netobserv/network-observability-console-plugin/pkg/model"
)

func TestMerge(t *testing.T) {
func TestStreamsMerge(t *testing.T) {
now := time.Now()
merger := NewStreamMerger()
baseline := model.Stream{
Expand All @@ -21,10 +21,10 @@ func TestMerge(t *testing.T) {
Line: "{key: value}",
}},
}
merger.Add(model.Streams{baseline})
merger.AddStreams(model.Streams{baseline})

// Different label, different line => no dedup
merged := merger.Add(model.Streams{{
merged := merger.AddStreams(model.Streams{{
Labels: map[string]string{
"foo": "bar",
"foo2": "bar2",
Expand All @@ -42,7 +42,7 @@ func TestMerge(t *testing.T) {
assert.Len(t, merged[1].Entries, 1)

// Same labels in different order => no dedup
merged = merger.Add(model.Streams{{
merged = merger.AddStreams(model.Streams{{
Labels: map[string]string{
"foo2": "bar2",
"foo": "bar",
Expand All @@ -66,7 +66,7 @@ func TestMerge(t *testing.T) {
assert.Len(t, merged[1].Entries, 1)

// Different timestamp => no dedup
merged = merger.Add(model.Streams{{
merged = merger.AddStreams(model.Streams{{
Labels: baseline.Labels,
Entries: []model.Entry{{
Timestamp: now.Add(time.Hour),
Expand All @@ -78,7 +78,7 @@ func TestMerge(t *testing.T) {
assert.Len(t, merged[1].Entries, 1)

// some dedup
merged = merger.Add(model.Streams{{
merged = merger.AddStreams(model.Streams{{
// changed line => no dedup
Labels: baseline.Labels,
Entries: []model.Entry{{
Expand Down

0 comments on commit a407b30

Please sign in to comment.