From 1f4d1d21dc63636c22b64211fb3f5e09dc9503df Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Thu, 26 May 2022 16:16:13 -0400 Subject: [PATCH] [refactor] Simplify field naming in CollectorOptions (#3707) --- cmd/all-in-one/main.go | 2 +- cmd/collector/app/collector.go | 34 ++++---- .../app/{builder_flags.go => flags.go} | 87 ++++++++++--------- .../{builder_flags_test.go => flags_test.go} | 18 ++-- go.mod | 4 +- go.sum | 8 +- 6 files changed, 81 insertions(+), 72 deletions(-) rename cmd/collector/app/{builder_flags.go => flags.go} (69%) rename cmd/collector/app/{builder_flags_test.go => flags_test.go} (85%) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 65b80ca6d7f..269595a3291 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -172,7 +172,7 @@ by default uses only in-memory database.`, // agent // if the agent reporter grpc host:port was not explicitly set then use whatever the collector is listening on if len(grpcBuilder.CollectorHostPorts) == 0 { - grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.CollectorGRPCHostPort) + grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.GRPC.HostPort) } agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) builders := map[agentRep.Type]agentApp.CollectorProxyBuilder{ diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 0f92a9965e7..fa0fd01c888 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -79,10 +79,10 @@ func New(params *CollectorParams) *Collector { } // Start the component and underlying dependencies -func (c *Collector) Start(builderOpts *CollectorOptions) error { +func (c *Collector) Start(options *CollectorOptions) error { handlerBuilder := &SpanHandlerBuilder{ SpanWriter: c.spanWriter, - CollectorOpts: *builderOpts, + CollectorOpts: *options, Logger: c.logger, MetricsFactory: c.metricsFactory, } @@ -96,14 +96,14 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ - HostPort: builderOpts.CollectorGRPCHostPort, + HostPort: options.GRPC.HostPort, Handler: c.spanHandlers.GRPCHandler, - TLSConfig: builderOpts.TLSGRPC, + TLSConfig: options.GRPC.TLS, SamplingStore: c.strategyStore, Logger: c.logger, - MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength, - MaxConnectionAge: builderOpts.CollectorGRPCMaxConnectionAge, - MaxConnectionAgeGrace: builderOpts.CollectorGRPCMaxConnectionAgeGrace, + MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength, + MaxConnectionAge: options.GRPC.MaxConnectionAge, + MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace, }) if err != nil { return fmt.Errorf("could not start gRPC collector %w", err) @@ -111,9 +111,9 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { c.grpcServer = grpcServer httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{ - HostPort: builderOpts.CollectorHTTPHostPort, + HostPort: options.HTTP.HostPort, Handler: c.spanHandlers.JaegerBatchesHandler, - TLSConfig: builderOpts.TLSHTTP, + TLSConfig: options.HTTP.TLS, HealthCheck: c.hCheck, MetricsFactory: c.metricsFactory, SamplingStore: c.strategyStore, @@ -124,16 +124,16 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { } c.hServer = httpServer - c.tlsGRPCCertWatcherCloser = &builderOpts.TLSGRPC - c.tlsHTTPCertWatcherCloser = &builderOpts.TLSHTTP - c.tlsZipkinCertWatcherCloser = &builderOpts.TLSZipkin + c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS + c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS + c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{ - HostPort: builderOpts.CollectorZipkinHTTPHostPort, + HostPort: options.Zipkin.HTTPHostPort, Handler: c.spanHandlers.ZipkinSpansHandler, - TLSConfig: builderOpts.TLSZipkin, + TLSConfig: options.Zipkin.TLS, HealthCheck: c.hCheck, - AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders, - AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins, + AllowedHeaders: options.Zipkin.AllowedHeaders, + AllowedOrigins: options.Zipkin.AllowedOrigins, Logger: c.logger, MetricsFactory: c.metricsFactory, }) @@ -142,7 +142,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { } c.zkServer = zkServer - c.publishOpts(builderOpts) + c.publishOpts(options) return nil } diff --git a/cmd/collector/app/builder_flags.go b/cmd/collector/app/flags.go similarity index 69% rename from cmd/collector/app/builder_flags.go rename to cmd/collector/app/flags.go index fc22d42db2c..390247013b7 100644 --- a/cmd/collector/app/builder_flags.go +++ b/cmd/collector/app/flags.go @@ -62,32 +62,41 @@ type CollectorOptions struct { QueueSize int // NumWorkers is the number of internal workers in a collector NumWorkers int - // CollectorHTTPHostPort is the host:port address that the collector service listens in on for http requests - CollectorHTTPHostPort string - // CollectorGRPCHostPort is the host:port address that the collector service listens in on for gRPC requests - CollectorGRPCHostPort string - // TLSGRPC configures secure transport for gRPC endpoint to collect spans - TLSGRPC tlscfg.Options - // TLSHTTP configures secure transport for HTTP endpoint to collect spans - TLSHTTP tlscfg.Options - // TLSZipkin configures secure transport for Zipkin endpoint to collect spans - TLSZipkin tlscfg.Options + // HTTP section defines options for HTTP server + HTTP struct { + // HostPort is the host:port address that the collector service listens in on for http requests + HostPort string + // TLS configures secure transport for HTTP endpoint to collect spans + TLS tlscfg.Options + } + // GRPC section defines options for gRPC server + GRPC struct { + // HostPort is the host:port address that the collector service listens in on for gRPC requests + HostPort string + // TLS configures secure transport for gRPC endpoint to collect spans + TLS tlscfg.Options + // MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. + MaxReceiveMessageLength int + // MaxConnectionAge is a duration for the maximum amount of time a connection may exist. + // See gRPC's keepalive.ServerParameters#MaxConnectionAge. + MaxConnectionAge time.Duration + // MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. + // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. + MaxConnectionAgeGrace time.Duration + } + // Zipkin section defines options for Zipkin HTTP server + Zipkin struct { + // HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests + HTTPHostPort string + // ZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from + AllowedOrigins string + // ZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests + AllowedHeaders string + // TLS configures secure transport for Zipkin endpoint to collect spans + TLS tlscfg.Options + } // CollectorTags is the string representing collector tags to append to each and every span CollectorTags map[string]string - // CollectorZipkinHTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests - CollectorZipkinHTTPHostPort string - // CollectorZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from - CollectorZipkinAllowedOrigins string - // CollectorZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests - CollectorZipkinAllowedHeaders string - // CollectorGRPCMaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector. - CollectorGRPCMaxReceiveMessageLength int - // CollectorGRPCMaxConnectionAge is a duration for the maximum amount of time a connection may exist. - // See gRPC's keepalive.ServerParameters#MaxConnectionAge. - CollectorGRPCMaxConnectionAge time.Duration - // CollectorGRPCMaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. - // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. - CollectorGRPCMaxConnectionAgeGrace time.Duration } // AddFlags adds flags for CollectorOptions @@ -112,33 +121,33 @@ func AddFlags(flags *flag.FlagSet) { // InitFromViper initializes CollectorOptions with properties from viper func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) (*CollectorOptions, error) { - cOpts.CollectorGRPCHostPort = ports.FormatHostPort(v.GetString(collectorGRPCHostPort)) - cOpts.CollectorHTTPHostPort = ports.FormatHostPort(v.GetString(collectorHTTPHostPort)) - cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags)) - cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders) - cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins) - cOpts.CollectorZipkinHTTPHostPort = ports.FormatHostPort(v.GetString(collectorZipkinHTTPHostPort)) - cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes - cOpts.NumWorkers = v.GetInt(collectorNumWorkers) - cOpts.QueueSize = v.GetInt(collectorQueueSize) - cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength) - cOpts.CollectorGRPCMaxConnectionAge = v.GetDuration(collectorMaxConnectionAge) - cOpts.CollectorGRPCMaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace) + cOpts.GRPC.HostPort = ports.FormatHostPort(v.GetString(collectorGRPCHostPort)) + cOpts.GRPC.MaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength) + cOpts.GRPC.MaxConnectionAge = v.GetDuration(collectorMaxConnectionAge) + cOpts.GRPC.MaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace) if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil { - cOpts.TLSGRPC = tlsGrpc + cOpts.GRPC.TLS = tlsGrpc } else { return cOpts, fmt.Errorf("failed to parse gRPC TLS options: %w", err) } + cOpts.HTTP.HostPort = ports.FormatHostPort(v.GetString(collectorHTTPHostPort)) if tlsHTTP, err := tlsHTTPFlagsConfig.InitFromViper(v); err == nil { - cOpts.TLSHTTP = tlsHTTP + cOpts.HTTP.TLS = tlsHTTP } else { return cOpts, fmt.Errorf("failed to parse HTTP TLS options: %w", err) } + cOpts.Zipkin.AllowedHeaders = v.GetString(collectorZipkinAllowedHeaders) + cOpts.Zipkin.AllowedOrigins = v.GetString(collectorZipkinAllowedOrigins) + cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(collectorZipkinHTTPHostPort)) if tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v); err == nil { - cOpts.TLSZipkin = tlsZipkin + cOpts.Zipkin.TLS = tlsZipkin } else { return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err) } + cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags)) + cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes + cOpts.NumWorkers = v.GetInt(collectorNumWorkers) + cOpts.QueueSize = v.GetInt(collectorQueueSize) return cOpts, nil } diff --git a/cmd/collector/app/builder_flags_test.go b/cmd/collector/app/flags_test.go similarity index 85% rename from cmd/collector/app/builder_flags_test.go rename to cmd/collector/app/flags_test.go index 3a1d7675007..3b3b5b30a79 100644 --- a/cmd/collector/app/builder_flags_test.go +++ b/cmd/collector/app/flags_test.go @@ -34,9 +34,9 @@ func TestCollectorOptionsWithFlags_CheckHostPort(t *testing.T) { }) c.InitFromViper(v) - assert.Equal(t, ":5678", c.CollectorHTTPHostPort) - assert.Equal(t, ":1234", c.CollectorGRPCHostPort) - assert.Equal(t, ":3456", c.CollectorZipkinHTTPHostPort) + assert.Equal(t, ":5678", c.HTTP.HostPort) + assert.Equal(t, ":1234", c.GRPC.HostPort) + assert.Equal(t, ":3456", c.Zipkin.HTTPHostPort) } func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { @@ -49,9 +49,9 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) { }) c.InitFromViper(v) - assert.Equal(t, ":5678", c.CollectorHTTPHostPort) - assert.Equal(t, "127.0.0.1:1234", c.CollectorGRPCHostPort) - assert.Equal(t, "0.0.0.0:3456", c.CollectorZipkinHTTPHostPort) + assert.Equal(t, ":5678", c.HTTP.HostPort) + assert.Equal(t, "127.0.0.1:1234", c.GRPC.HostPort) + assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort) } func TestCollectorOptionsWithFailedHTTPFlags(t *testing.T) { @@ -101,7 +101,7 @@ func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) { }) c.InitFromViper(v) - assert.Equal(t, 8388608, c.CollectorGRPCMaxReceiveMessageLength) + assert.Equal(t, 8388608, c.GRPC.MaxReceiveMessageLength) } func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) { @@ -113,6 +113,6 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) { }) c.InitFromViper(v) - assert.Equal(t, 5*time.Minute, c.CollectorGRPCMaxConnectionAge) - assert.Equal(t, time.Minute, c.CollectorGRPCMaxConnectionAgeGrace) + assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge) + assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace) } diff --git a/go.mod b/go.mod index 1d90d18a2c8..62576afaa4d 100644 --- a/go.mod +++ b/go.mod @@ -43,8 +43,8 @@ require ( github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible github.com/xdg-go/scram v1.1.1 - go.opentelemetry.io/collector/pdata v0.51.0 - go.opentelemetry.io/collector/semconv v0.51.0 + go.opentelemetry.io/collector/pdata v0.52.0 + go.opentelemetry.io/collector/semconv v0.52.0 go.uber.org/atomic v1.9.0 go.uber.org/automaxprocs v1.5.1 go.uber.org/zap v1.21.0 diff --git a/go.sum b/go.sum index edc99891ab0..5380fd9c771 100644 --- a/go.sum +++ b/go.sum @@ -676,10 +676,10 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/collector/pdata v0.51.0 h1:J5CCnrg1iGOA1CfFOH+wAkMlJ1vjcm677kuPp18mbko= -go.opentelemetry.io/collector/pdata v0.51.0/go.mod h1:FsowYKNmf8CgsHgOfJv8V3KjALmy6FYQRHtXAOY3fho= -go.opentelemetry.io/collector/semconv v0.51.0 h1:Almgp3RZKDZNIp5491LfQ/oF2GG5gPK14IghjUxgkPg= -go.opentelemetry.io/collector/semconv v0.51.0/go.mod h1:SxK0rUnUP7YeDakexzbE/vhimTOHwE6m/4aKKd9e27Q= +go.opentelemetry.io/collector/pdata v0.52.0 h1:B0L9fkqKq5xRKFjICK9i11PRyTR52CCYSpTWaynf1Qc= +go.opentelemetry.io/collector/pdata v0.52.0/go.mod h1:GJUTfTv8mlYpHRjcmHXVbvJr48EW/q/P/HuBvpXAE58= +go.opentelemetry.io/collector/semconv v0.52.0 h1:ogYkQ6WL01xQ4GGSwWQejNTQwy/Pwcd6jCKFLSd7svA= +go.opentelemetry.io/collector/semconv v0.52.0/go.mod h1:SxK0rUnUP7YeDakexzbE/vhimTOHwE6m/4aKKd9e27Q= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=