Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unit test for OTLP receiver in jaeger-v1 #6541

Merged
merged 30 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5ce9b7e
unit test
chahatsagarmain Jan 13, 2025
1e1208e
changes
chahatsagarmain Jan 14, 2025
bd2ea16
Merge branch 'main' into e2e-otlp
chahatsagarmain Jan 15, 2025
b1116f5
logging tenant
chahatsagarmain Jan 15, 2025
24b9b7e
http test
chahatsagarmain Jan 16, 2025
bcc716a
resolved changes
chahatsagarmain Jan 18, 2025
e1d95b2
atomic operation on shared var
chahatsagarmain Jan 18, 2025
5ff0d50
Update cmd/collector/app/span_processor_test.go
yurishkuro Jan 18, 2025
105a171
use assert.eventually
chahatsagarmain Jan 18, 2025
25b68ec
Merge branch 'e2e-otlp' of https://github.com/chahatsagarmain/jaeger …
chahatsagarmain Jan 18, 2025
d750fbc
remove unecessasry code
chahatsagarmain Jan 18, 2025
3868088
Update cmd/collector/app/span_processor_test.go
yurishkuro Jan 19, 2025
90f8935
Merge branch 'e2e-otlp' of https://github.com/chahatsagarmain/jaeger …
chahatsagarmain Jan 21, 2025
28c1231
added grpc test
chahatsagarmain Jan 21, 2025
04c9aab
refactoring
chahatsagarmain Jan 21, 2025
a89135f
Merge branch 'main' into e2e-otlp
chahatsagarmain Jan 21, 2025
95712f2
changes
chahatsagarmain Jan 22, 2025
37b1105
Merge branch 'main' into e2e-otlp
chahatsagarmain Jan 23, 2025
a0b9aeb
changes
chahatsagarmain Jan 25, 2025
4330264
changes
chahatsagarmain Jan 25, 2025
1a9f494
Merge branch 'e2e-otlp' of https://github.com/chahatsagarmain/jaeger …
chahatsagarmain Jan 25, 2025
11d65ce
Merge branch 'main' into e2e-otlp
chahatsagarmain Jan 25, 2025
9544bad
lint
chahatsagarmain Jan 25, 2025
2befd5d
isoloate mock writer
chahatsagarmain Jan 26, 2025
b589977
valid tenant and refactored test
chahatsagarmain Jan 26, 2025
1d7f157
refactor
chahatsagarmain Jan 26, 2025
7b40951
Merge branch 'main' into e2e-otlp
chahatsagarmain Jan 28, 2025
aaedf3c
lint
chahatsagarmain Jan 28, 2025
1e21118
refactor and fix test
chahatsagarmain Jan 29, 2025
1342050
Merge branch 'main' into e2e-otlp
chahatsagarmain Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"context"
"errors"

"go.opentelemetry.io/collector/client"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // register zip encoding
Expand Down Expand Up @@ -98,12 +99,19 @@
return "", nil
}

httpMd := client.FromContext(ctx)
tenants := httpMd.Metadata.Get(c.tenancyMgr.Header)

if len(tenants) > 0 {
return tenants[0], nil
}

Check warning on line 107 in cmd/collector/app/handler/grpc_handler.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/handler/grpc_handler.go#L106-L107

Added lines #L106 - L107 were not covered by tests

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
}

tenants := md.Get(c.tenancyMgr.Header)
tenants = md.Get(c.tenancyMgr.Header)
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
Expand Down
194 changes: 194 additions & 0 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,41 @@ import (
"errors"
"fmt"
"io"
"net/http"
"reflect"
"slices"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/pdata/ptrace"
otlptrace "go.opentelemetry.io/proto/otlp/collector/trace/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/jaegertracing/jaeger-idl/model/v1"
cFlags "github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
Expand Down Expand Up @@ -807,3 +822,182 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
require.EqualError(t, err, processor.ErrBusy.Error())
assert.Equal(t, []string{"op3"}, droppedOperations)
}

func optionsWithPorts(portHttp string, portGrpc string) *cFlags.CollectorOptions {
opts := &cFlags.CollectorOptions{
OTLP: struct {
Enabled bool
GRPC configgrpc.ServerConfig
HTTP confighttp.ServerConfig
}{
Enabled: true,
HTTP: confighttp.ServerConfig{
Endpoint: portHttp,
IncludeMetadata: true,
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: portGrpc,
Transport: confignet.TransportTypeTCP,
},
},
},
}
return opts
}

func TestOTLPReceiverWithV2Storage(t *testing.T) {
// Setup mock writer and expectations
mockWriter := mocks.NewWriter(t)

var receivedTraces atomic.Pointer[ptrace.Traces]
var receivedCtx atomic.Pointer[context.Context]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you cannot share state between tests like this, it creates side effects. HTTP and gRPC should be tested in complete isolation.

mockWriter.On("WriteTraces", mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
storeContext := args.Get(0).(context.Context)
storeTrace := args.Get(1).(ptrace.Traces)
receivedTraces.Store(&storeTrace)
receivedCtx.Store(&storeContext)
}).Return(nil)

spanProcessor, err := NewSpanProcessor(
mockWriter,
nil,
Options.NumWorkers(1),
Options.QueueSize(1),
Options.ReportBusy(true),
)
require.NoError(t, err)
defer spanProcessor.Close()
logger := zaptest.NewLogger(t)

portHttp := "4317"
portGrpc := "4318"

// Send trace via HTTP
t.Run("Send trace data using HTTP connection", func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are multiple ways to organize the tests, yours makes the least sense to me.

Option 1: two top-level functions, with shared helper functions
Option 2: a single test function with table-driven tests, no setup helpers needed but each test has a lambda function parameter executeRequest implemented with two other functions, for http and grpc separately

Your test is only checking success path, does not check scenario when incoming tenant is invalid.

// Can't send tenancy headers with http request to OTLP receiver
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old comment

tenancyMgr := &tenancy.Manager{
Enabled: true,
Header: "x-tenant",
}

rec, err := handler.StartOTLPReceiver(
optionsWithPorts(fmt.Sprintf("localhost:%v", portHttp), fmt.Sprintf("localhost:%v", portGrpc)),
logger,
spanProcessor,
tenancyMgr,
)
require.NoError(t, err)
ctx := context.Background()
defer rec.Shutdown(ctx)
Copy link
Member

@yurishkuro yurishkuro Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from L852 you are repeating a lot of code for each test. You can create a helper function that will initialize the receiver (it can be a lambda function).


url := fmt.Sprintf("http://localhost:%v/v1/traces", portHttp)

traceJSON := `{
"resourceSpans": [{
"scopeSpans": [{
"spans": [{
"name": "test-trace"
}]
}]
}]
}`

httpClient := http.Client{}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, strings.NewReader(traceJSON))
require.NoError(t, err)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-tenant", "test-tenant")
resp, err := httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)

assert.Eventually(t, func() bool {
storedTraces := receivedTraces.Load()
storedCtx := receivedCtx.Load()
if storedTraces == nil || storedCtx == nil {
return false
}
receivedSpan := storedTraces.ResourceSpans().At(0).
ScopeSpans().At(0).
Spans().At(0)
receivedTenant := tenancy.GetTenant(*storedCtx)
return receivedSpan.Name() == "test-trace" && receivedTenant == "test-tenant"
}, 1*time.Second, 1*time.Millisecond)

mockWriter.AssertExpectations(t)
})

t.Run("Send trace data using GRPC connection", func(t *testing.T) {
// Setup proper tenancy manager using NewManager
tenancyMgr := tenancy.NewManager(&tenancy.Options{
Enabled: true,
Header: "x-tenant",
Tenants: []string{"test-tenant"},
})

// Create and start receiver
rec, err := handler.StartOTLPReceiver(
optionsWithPorts(fmt.Sprintf("localhost:%v", portHttp), fmt.Sprintf("localhost:%v", portGrpc)),
logger,
spanProcessor,
tenancyMgr,
)
require.NoError(t, err)
ctx := context.Background()
defer rec.Shutdown(ctx)

// Create gRPC client connection
conn, err := grpc.NewClient(
fmt.Sprintf("localhost:%v", portGrpc),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
require.NoError(t, err)
defer conn.Close()

// Create metadata with tenant information
md := metadata.New(map[string]string{
tenancyMgr.Header: "test-tenant",
})
ctxWithMD := metadata.NewOutgoingContext(ctx, md)

client := otlptrace.NewTraceServiceClient(conn)
req := &otlptrace.ExportTraceServiceRequest{
ResourceSpans: []*tracepb.ResourceSpans{
{
ScopeSpans: []*tracepb.ScopeSpans{
{
Spans: []*tracepb.Span{
{
Name: "test-trace",
},
},
},
},
},
},
}

// Send traces via gRPC
_, err = client.Export(ctxWithMD, req)
require.NoError(t, err)

assert.Eventually(t, func() bool {
storedTraces := receivedTraces.Load()
storedCtx := receivedCtx.Load()
if storedTraces == nil || storedCtx == nil {
return false
}
receivedSpan := storedTraces.ResourceSpans().At(0).
ScopeSpans().At(0).
Spans().At(0)
receivedTenant := tenancy.GetTenant(*storedCtx)
return receivedSpan.Name() == "test-trace" && receivedTenant == "test-tenant"
}, 1*time.Second, 1*time.Millisecond)

mockWriter.AssertExpectations(t)
})
}
Loading