diff --git a/server/embed/config_tracing.go b/server/embed/config_tracing.go index 2ee7035431f..9e03d03e320 100644 --- a/server/embed/config_tracing.go +++ b/server/embed/config_tracing.go @@ -40,13 +40,19 @@ func validateTracingConfig(samplingRate int) error { return nil } -func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) { - exporter, err = otlptracegrpc.New(ctx, +type tracingExporter struct { + exporter tracesdk.SpanExporter + opts []otelgrpc.Option + provider *tracesdk.TracerProvider +} + +func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) { + exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithInsecure(), otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress), ) if err != nil { - return nil, nil, err + return nil, err } res, err := resource.New(ctx, @@ -55,7 +61,7 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S ), ) if err != nil { - return nil, nil, err + return nil, err } if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil { @@ -63,11 +69,19 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S // resource in case of duplicates. res, err = resource.Merge(res, resWithIDKey) if err != nil { - return nil, nil, err + return nil, err } } - options = append(options, + traceProvider := tracesdk.NewTracerProvider( + tracesdk.WithBatcher(exporter), + tracesdk.WithResource(res), + tracesdk.WithSampler( + tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)), + ), + ) + + options := []otelgrpc.Option{ otelgrpc.WithPropagators( propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, @@ -75,15 +89,9 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S ), ), otelgrpc.WithTracerProvider( - tracesdk.NewTracerProvider( - tracesdk.WithBatcher(exporter), - tracesdk.WithResource(res), - tracesdk.WithSampler( - tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)), - ), - ), + traceProvider, ), - ) + } cfg.logger.Debug( "distributed tracing enabled", @@ -93,7 +101,20 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion), ) - return exporter, options, err + return &tracingExporter{ + exporter: exporter, + opts: options, + provider: traceProvider, + }, nil +} + +func (te *tracingExporter) Close(ctx context.Context) { + if te.provider != nil { + te.provider.Shutdown(ctx) + } + if te.exporter != nil { + te.exporter.Shutdown(ctx) + } } func determineSampler(samplingRate int) tracesdk.Sampler { diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 3d94b63be1e..0f11e9584a1 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -227,15 +227,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { if srvcfg.ExperimentalEnableDistributedTracing { tctx := context.Background() - tracingExporter, opts, err := setupTracingExporter(tctx, cfg) + tracingExporter, err := newTracingExporter(tctx, cfg) if err != nil { return e, err } - if tracingExporter == nil || len(opts) == 0 { - return e, fmt.Errorf("error setting up distributed tracing") + e.tracingExporterShutdown = func() { + tracingExporter.Close(tctx) } - e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) } - srvcfg.ExperimentalTracerOptions = opts + srvcfg.ExperimentalTracerOptions = tracingExporter.opts e.cfg.logger.Info( "distributed tracing setup enabled", diff --git a/tests/go.mod b/tests/go.mod index 0ce05f47124..67a5c733687 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -37,6 +37,11 @@ require ( go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0 go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0 go.etcd.io/etcd/server/v3 v3.6.0-alpha.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 + go.opentelemetry.io/otel v1.7.0 + go.opentelemetry.io/otel/sdk v1.7.0 + go.opentelemetry.io/otel/trace v1.7.0 + go.opentelemetry.io/proto/otlp v0.16.0 go.uber.org/zap v1.21.0 golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f @@ -80,14 +85,9 @@ require ( github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect go.etcd.io/bbolt v1.3.6 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect - go.opentelemetry.io/otel v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 // indirect - go.opentelemetry.io/otel/sdk v1.7.0 // indirect - go.opentelemetry.io/otel/trace v1.7.0 // indirect - go.opentelemetry.io/proto/otlp v0.16.0 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/net v0.0.0-20220615171555-694bf12d69de // indirect diff --git a/tests/integration/tracing_test.go b/tests/integration/tracing_test.go new file mode 100644 index 00000000000..53fe379a52c --- /dev/null +++ b/tests/integration/tracing_test.go @@ -0,0 +1,146 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/pkg/v3/testutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/tests/v3/framework/integration" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/propagation" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "google.golang.org/grpc" +) + +// TestTracing ensures that distributed tracing is setup when the feature flag is enabled. +func TestTracing(t *testing.T) { + testutil.SkipTestIfShortMode(t, + "Wal creation tests are depending on embedded etcd server so are integration-level tests.") + // set up trace collector + listener, err := net.Listen("tcp", "localhost:") + if err != nil { + t.Fatal(err) + } + + traceFound := make(chan struct{}) + defer close(traceFound) + + srv := grpc.NewServer() + traceservice.RegisterTraceServiceServer(srv, &traceServer{ + traceFound: traceFound, + filterFunc: containsNodeListSpan}) + + go srv.Serve(listener) + defer srv.Stop() + + cfg := integration.NewEmbedConfig(t, "default") + cfg.ExperimentalEnableDistributedTracing = true + cfg.ExperimentalDistributedTracingAddress = listener.Addr().String() + cfg.ExperimentalDistributedTracingServiceName = "integration-test-tracing" + cfg.ExperimentalDistributedTracingSamplingRatePerMillion = 100 + + // start an etcd instance with tracing enabled + etcdSrv, err := embed.StartEtcd(cfg) + if err != nil { + t.Fatal(err) + } + defer etcdSrv.Close() + + select { + case <-etcdSrv.Server.ReadyNotify(): + case <-time.After(1 * time.Second): + t.Fatalf("failed to start embed.Etcd for test") + } + + // create a client that has tracing enabled + tracer := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample())) + defer tracer.Shutdown(context.TODO()) + tp := trace.TracerProvider(tracer) + + tracingOpts := []otelgrpc.Option{ + otelgrpc.WithTracerProvider(tp), + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + )), + } + + dialOptions := []grpc.DialOption{ + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))} + ccfg := clientv3.Config{DialOptions: dialOptions, Endpoints: []string{cfg.ACUrls[0].String()}} + cli, err := integration.NewClient(t, ccfg) + if err != nil { + etcdSrv.Close() + t.Fatal(err) + } + defer cli.Close() + + // make a request with the instrumented client + resp, err := cli.Get(context.TODO(), "key") + require.NoError(t, err) + require.Empty(t, resp.Kvs) + + // Wait for a span to be recorded from our request + select { + case <-traceFound: + return + case <-time.After(30 * time.Second): + t.Fatal("Timed out waiting for trace") + } +} + +func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool { + for _, resourceSpans := range req.GetResourceSpans() { + for _, attr := range resourceSpans.GetResource().GetAttributes() { + if attr.GetKey() != "service.name" && attr.GetValue().GetStringValue() != "integration-test-tracing" { + continue + } + for _, scoped := range resourceSpans.GetScopeSpans() { + for _, span := range scoped.GetSpans() { + if span.GetName() == "etcdserverpb.KV/Range" { + return true + } + } + } + } + } + return false +} + +// traceServer implements TracesServiceServer +type traceServer struct { + traceFound chan struct{} + filterFunc func(req *traceservice.ExportTraceServiceRequest) bool + traceservice.UnimplementedTraceServiceServer +} + +func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) { + var emptyValue = traceservice.ExportTraceServiceResponse{} + if t.filterFunc(req) { + t.traceFound <- struct{}{} + } + return &emptyValue, nil +}