Skip to content

Commit

Permalink
Fix retrieval of peer.service from Zipkin's MESSAGE_ADDR annotation (#…
Browse files Browse the repository at this point in the history
…3312)

Signed-off-by: Martin Schimandl <[email protected]>

Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
Git-Jiro and yurishkuro authored Oct 14, 2021
1 parent b8e8fc2 commit 0d1727c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 3 deletions.
23 changes: 23 additions & 0 deletions cmd/collector/app/zipkin/fixtures/zipkin_03.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[
{
"traceId": "091f00370361e578",
"parentId": "c26551047c72d19",
"id": "188bb8428fc7e477",
"kind": "PRODUCER",
"name": "send",
"timestamp": 1597704629675602,
"duration": 9550570,
"localEndpoint":
{
"serviceName": "schemas-service"
},
"remoteEndpoint":
{
"serviceName": "kafka"
},
"tags":
{
"kafka.topic": "schema-changed"
}
}
]
52 changes: 50 additions & 2 deletions cmd/collector/app/zipkin/jsonv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
package zipkin

import (
"fmt"
"os"
"testing"

"github.com/go-openapi/swag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/swagger-gen/models"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)
Expand Down Expand Up @@ -54,7 +55,6 @@ func TestLCFromLocalEndpoint(t *testing.T) {
var spans models.ListOfSpans
loadJSON(t, "fixtures/zipkin_02.json", &spans)
tSpans, err := spansV2ToThrift(spans)
fmt.Println(tSpans[0])
require.NoError(t, err)
assert.Equal(t, len(tSpans), 1)
var ts int64 = 1
Expand All @@ -67,6 +67,54 @@ func TestLCFromLocalEndpoint(t *testing.T) {
assert.Equal(t, tSpan, tSpans[0])
}

func TestMissingKafkaEndpoint(t *testing.T) {
var spans models.ListOfSpans
loadJSON(t, "fixtures/zipkin_03.json", &spans)
tSpans, err := spansV2ToThrift(spans)
require.NoError(t, err)
assert.Equal(t, 1, len(tSpans))
var ts int64 = 1597704629675602
var d int64 = 9550570
var parentId int64 = 0xc26551047c72d19
var endpoint1 = zipkincore.Endpoint{ServiceName: "schemas-service"}
var endpoint2 = zipkincore.Endpoint{ServiceName: "schemas-service"}
var endpoint3 = zipkincore.Endpoint{ServiceName: "kafka"}

tSpan := &zipkincore.Span{ID: 0x188bb8428fc7e477, TraceID: 0x091f00370361e578, ParentID: &parentId,
Name: "send", Duration: &d, Timestamp: &ts,
Annotations: []*zipkincore.Annotation{
{
Host: &endpoint1,
Timestamp: ts,
Value: zipkincore.MESSAGE_SEND,
},
},
BinaryAnnotations: []*zipkincore.BinaryAnnotation{
{
Host: &endpoint2,
Key: "kafka.topic",
Value: []byte("schema-changed"),
AnnotationType: zipkincore.AnnotationType_STRING},
{

Key: zipkincore.MESSAGE_ADDR,
Host: &endpoint3,
AnnotationType: zipkincore.AnnotationType_BOOL,
},
},
}
assert.Equal(t, tSpan, tSpans[0])

tTags := []model.KeyValue{
{Key: "kafka.topic", VStr: "schema-changed"},
{Key: "peer.service", VStr: "kafka"},
}
var jaegerspan []*model.Span
jaegerspan, err = zipkin.ToDomainSpan(tSpans[0])
require.NoError(t, err)
assert.Equal(t, tTags, jaegerspan[0].GetTags())
}

func TestKindToThrift(t *testing.T) {
tests := []struct {
ts int64
Expand Down
2 changes: 1 addition & 1 deletion model/converter/thrift/zipkin/to_domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (td toDomain) getTags(binAnnotations []*zipkincore.BinaryAnnotation, tagInc
value := string(annotation.Value)
tag := model.String(string(ext.Component), value)
retMe = append(retMe, tag)
case zipkincore.SERVER_ADDR, zipkincore.CLIENT_ADDR:
case zipkincore.SERVER_ADDR, zipkincore.CLIENT_ADDR, zipkincore.MESSAGE_ADDR:
retMe = td.getPeerTags(annotation.Host, retMe)
default:
tag, err := td.transformBinaryAnnotation(annotation)
Expand Down

0 comments on commit 0d1727c

Please sign in to comment.