Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

embed: add integration test for distributed tracing #14348

Merged
merged 3 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions server/embed/config_tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,19 @@ func validateTracingConfig(samplingRate int) error {
return nil
}

func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.SpanExporter, options []otelgrpc.Option, err error) {
exporter, err = otlptracegrpc.New(ctx,
type tracingExporter struct {
exporter tracesdk.SpanExporter
opts []otelgrpc.Option
provider *tracesdk.TracerProvider
}

func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) {
exporter, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
)
if err != nil {
return nil, nil, err
return nil, err
}

res, err := resource.New(ctx,
Expand All @@ -55,35 +61,37 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
),
)
if err != nil {
return nil, nil, err
return nil, err
}

if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
// Merge resources into a new
// resource in case of duplicates.
res, err = resource.Merge(res, resWithIDKey)
if err != nil {
return nil, nil, err
return nil, err
}
}

options = append(options,
traceProvider := tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
tracesdk.WithSampler(
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
),
)

options := []otelgrpc.Option{
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
),
),
otelgrpc.WithTracerProvider(
tracesdk.NewTracerProvider(
tracesdk.WithBatcher(exporter),
tracesdk.WithResource(res),
tracesdk.WithSampler(
tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
),
),
traceProvider,
),
)
}

cfg.logger.Debug(
"distributed tracing enabled",
Expand All @@ -93,7 +101,20 @@ func setupTracingExporter(ctx context.Context, cfg *Config) (exporter tracesdk.S
zap.Int("sampling-rate", cfg.ExperimentalDistributedTracingSamplingRatePerMillion),
)

return exporter, options, err
return &tracingExporter{
exporter: exporter,
opts: options,
provider: traceProvider,
}, nil
}

func (te *tracingExporter) Close(ctx context.Context) {
if te.provider != nil {
te.provider.Shutdown(ctx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite confident on this. Have you considered the order of the shutdown? In other words, is there any potential issue when shutting down the exporter firstly? I just had a quick code review on the otl SDK, it seems that the provider callbacks the exporters; if we shutdown the exporter firstly, will it cause any potential temporary issue on the provider?

Have you thought about shutting down the provider firstly?

Please clarify this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, if the exporter is shutdown first, the provider will either hit a timeout or a shutdown error. It would mean that some spans that hasn't been sent to the remote may be lost. Thank you for pointing it out. I changed it to shutdown the provider first.

if te.exporter != nil {
te.exporter.Shutdown(ctx)
}
}

func determineSampler(samplingRate int) tracesdk.Sampler {
Expand Down
9 changes: 4 additions & 5 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,14 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {

if srvcfg.ExperimentalEnableDistributedTracing {
tctx := context.Background()
tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
tracingExporter, err := newTracingExporter(tctx, cfg)
if err != nil {
return e, err
}
if tracingExporter == nil || len(opts) == 0 {
return e, fmt.Errorf("error setting up distributed tracing")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You intentionally removed this, does it mean that tracingExporter == nil || len(opts) == 0 will never be true if the returned err is nil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the intention. I noticed no code path would result in err == nil and either exporter nor opts would be nil

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

e.tracingExporterShutdown = func() {
tracingExporter.Close(tctx)
}
e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
srvcfg.ExperimentalTracerOptions = opts
srvcfg.ExperimentalTracerOptions = tracingExporter.opts

e.cfg.logger.Info(
"distributed tracing setup enabled",
Expand Down
10 changes: 5 additions & 5 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ require (
go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0
go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0
go.etcd.io/etcd/server/v3 v3.6.0-alpha.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.opentelemetry.io/proto/otlp v0.16.0
go.uber.org/zap v1.21.0
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
Expand Down Expand Up @@ -80,14 +85,9 @@ require (
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0 // indirect
go.opentelemetry.io/otel/sdk v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
go.opentelemetry.io/proto/otlp v0.16.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20220615171555-694bf12d69de // indirect
Expand Down
146 changes: 146 additions & 0 deletions tests/integration/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/tests/v3/framework/integration"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
traceservice "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
)

// TestTracing ensures that distributed tracing is setup when the feature flag is enabled.
func TestTracing(t *testing.T) {
testutil.SkipTestIfShortMode(t,
"Wal creation tests are depending on embedded etcd server so are integration-level tests.")
// set up trace collector
listener, err := net.Listen("tcp", "localhost:")
if err != nil {
t.Fatal(err)
}

traceFound := make(chan struct{})
defer close(traceFound)

srv := grpc.NewServer()
traceservice.RegisterTraceServiceServer(srv, &traceServer{
traceFound: traceFound,
filterFunc: containsNodeListSpan})

go srv.Serve(listener)
defer srv.Stop()

cfg := integration.NewEmbedConfig(t, "default")
cfg.ExperimentalEnableDistributedTracing = true
cfg.ExperimentalDistributedTracingAddress = listener.Addr().String()
cfg.ExperimentalDistributedTracingServiceName = "integration-test-tracing"
cfg.ExperimentalDistributedTracingSamplingRatePerMillion = 100

// start an etcd instance with tracing enabled
etcdSrv, err := embed.StartEtcd(cfg)
if err != nil {
t.Fatal(err)
}
defer etcdSrv.Close()

select {
case <-etcdSrv.Server.ReadyNotify():
case <-time.After(1 * time.Second):
t.Fatalf("failed to start embed.Etcd for test")
}

// create a client that has tracing enabled
tracer := sdktrace.NewTracerProvider(sdktrace.WithSampler(sdktrace.AlwaysSample()))
defer tracer.Shutdown(context.TODO())
tp := trace.TracerProvider(tracer)

tracingOpts := []otelgrpc.Option{
otelgrpc.WithTracerProvider(tp),
otelgrpc.WithPropagators(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{},
propagation.Baggage{},
)),
}

dialOptions := []grpc.DialOption{
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...))}
ccfg := clientv3.Config{DialOptions: dialOptions, Endpoints: []string{cfg.ACUrls[0].String()}}
cli, err := integration.NewClient(t, ccfg)
if err != nil {
etcdSrv.Close()
t.Fatal(err)
}
defer cli.Close()

// make a request with the instrumented client
resp, err := cli.Get(context.TODO(), "key")
require.NoError(t, err)
require.Empty(t, resp.Kvs)

// Wait for a span to be recorded from our request
select {
case <-traceFound:
return
case <-time.After(30 * time.Second):
t.Fatal("Timed out waiting for trace")
}
}

func containsNodeListSpan(req *traceservice.ExportTraceServiceRequest) bool {
for _, resourceSpans := range req.GetResourceSpans() {
for _, attr := range resourceSpans.GetResource().GetAttributes() {
if attr.GetKey() != "service.name" && attr.GetValue().GetStringValue() != "integration-test-tracing" {
continue
}
for _, scoped := range resourceSpans.GetScopeSpans() {
for _, span := range scoped.GetSpans() {
if span.GetName() == "etcdserverpb.KV/Range" {
return true
}
}
}
}
}
return false
}

// traceServer implements TracesServiceServer
type traceServer struct {
traceFound chan struct{}
filterFunc func(req *traceservice.ExportTraceServiceRequest) bool
traceservice.UnimplementedTraceServiceServer
}

func (t *traceServer) Export(ctx context.Context, req *traceservice.ExportTraceServiceRequest) (*traceservice.ExportTraceServiceResponse, error) {
var emptyValue = traceservice.ExportTraceServiceResponse{}
if t.filterFunc(req) {
t.traceFound <- struct{}{}
}
return &emptyValue, nil
}