Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

embed: add integration test for distributed tracing #14348

Merged
merged 3 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions server/embed/config_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ func validateTracingConfig(samplingRate int) error {
return nil
}

func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
exporter, err = otlptracegrpc.New(ctx,
type tracingExporter struct {
exporter tracesdk.SpanExporter
opts []otelgrpc.Option
shutdown func(ctx context.Context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name shutdown is a little strange to me, if tracingExporter knows what it should do on close, why not to get the traceProvider added to the struct directly? Two choice:

  1. Add the traceProvider into the struct directly, and call te.traceProvider.Shutdown(ctx) directly on Close.
  2. Rename to cleanup so that the logic is agnostic to tracingExporter, instead it's just doing the cleanup task it's told.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion. I went with the first option

}

func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) {
exporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
)
if err != nil {
return nil, nil, err
return nil, err
}

res, err := resource.New(ctx,
Expand All @@ -55,35 +61,36 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
),
)
if err != nil {
return nil, nil, err
return nil, err
}

if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
// Merge resources into a new
// resource in case of duplicates.
res, err = resource.Merge(res, resWithIDKey)
if err != nil {
return nil, nil, err
return nil, err
}
}

options = append(options,
traceProvider := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
tracesdk.WithSampler(
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
),
)
options := []otelgrpc.Option{
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
),
otelgrpc.WithTracerProvider(
tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
tracesdk.WithSampler(
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
),
),
traceProvider,
),
)
}

cfg.logger.Debug(
"distributed tracing enabled",
Expand All @@ -93,7 +100,18 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion),
)

return exporter, options, err
return &tracingExporter{
exporter: exporter,
opts: options,
shutdown: func(ctx context.Context) {
traceProvider.Shutdown(ctx)
},
}, nil
}

func (te *tracingExporter) Close(ctx context.Context) {
te.exporter.Shutdown(ctx)
te.shutdown(ctx)
}

func determineSampler(samplingRate int) tracesdk.Sampler {
Expand Down
9 changes: 4 additions & 5 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

if srvcfg.ExperimentalEnableDistributedTracing {
tctx := context.Background()
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
tracingExporter, err := newTracingExporter(tctx, cfg)
if err != nil {
return e, err
}
if tracingExporter == nil || len(opts) == 0 {
return e, fmt.Errorf("error setting up distributed tracing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You intentionally removed this, does it mean that tracingExporter == nil || len(opts) == 0 will never be true if the returned err is nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the intention. I noticed no code path would result in err == nil and either exporter nor opts would be nil

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

e.tracingExporterShutdown = func() {
tracingExporter.Close(tctx)
}
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
srvcfg.ExperimentalTracerOptions = opts
srvcfg.ExperimentalTracerOptions = tracingExporter.opts

e.cfg.logger.Info(
"distributed tracing setup enabled",
Expand Down
146 changes: 146 additions & 0 deletions tests/integration/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/tests/v3/framework/integration"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
)

// TestTracing ensures that distributed tracing is setup when the feature flag is enabled.
func TestTracing(t *testing.T) {
testutil.SkipTestIfShortMode(t,
"Wal creation tests are depending on embedded etcd server so are integration-level tests.")
// set up trace collector
listener, err := net.Listen("tcp", "localhost:")
if err != nil {
t.Fatal(err)
}

traceFound := make(chan struct{})
defer close(traceFound)

srv := grpc.NewServer()
traceservice.RegisterTraceServiceServer(srv, &traceServer{
traceFound: traceFound,
filterFunc: containsNodeListSpan})

go srv.Serve(listener)
defer srv.Stop()

cfg := integration.NewEmbedConfig(t, "default")
cfg.ExperimentalEnableDistributedTracing = true
cfg.ExperimentalDistributedTracingAddress = listener.Addr().String()
cfg.ExperimentalDistributedTracingServiceName = "integration-test-tracing"
cfg.ExperimentalDistributedTracingSamplingRatePerMillion = 100

// start an etcd instance with tracing enabled
etcdSrv, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
defer etcdSrv.Close()

select {
case <-etcdSrv.Server.ReadyNotify():
case <-time.After(1 * time.Second):
t.Fatalf("failed to start embed.Etcd for test")
}

// create a client that has tracing enabled
tracer := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
defer tracer.Shutdown(context.TODO())
tp := trace.TracerProvider(tracer)

tracingOpts := []otelgrpc.Option{
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)),
}

dialOptions := []grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))}
ccfg := clientv3.Config{DialOptions: dialOptions, Endpoints: []string{cfg.ACUrls[0].String()}}
cli, err := integration.NewClient(t, ccfg)
if err != nil {
etcdSrv.Close()
t.Fatal(err)
}
defer cli.Close()

// make a request with the instrumented client
resp, err := cli.Get(context.TODO(), "key")
require.NoError(t, err)
require.Empty(t, resp.Kvs)

// Wait for a span to be recorded from our request
select {
case <-traceFound:
return
case <-time.After(30 * time.Second):
t.Fatal("Timed out waiting for trace")
}
}

func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool {
for _, resourceSpans := range req.GetResourceSpans() {
for _, attr := range resourceSpans.GetResource().GetAttributes() {
if attr.GetKey() != "service.name" && attr.GetValue().GetStringValue() != "integration-test-tracing" {
continue
}
for _, scoped := range resourceSpans.GetScopeSpans() {
for _, span := range scoped.GetSpans() {
if span.GetName() == "etcdserverpb.KV/Range" {
return true
}
}
}
}
}
return false
}

// traceServer implements TracesServiceServer
type traceServer struct {
traceFound chan struct{}
filterFunc func(req *traceservice.ExportTraceServiceRequest) bool
traceservice.UnimplementedTraceServiceServer
}

func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
var emptyValue = traceservice.ExportTraceServiceResponse{}
if t.filterFunc(req) {
t.traceFound <- struct{}{}
}
return &emptyValue, nil
}