Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Simplify field naming in CollectorOptions #3707

Merged
merged 2 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ by default uses only in-memory database.`,
// agent
// if the agent reporter grpc host:port was not explicitly set then use whatever the collector is listening on
if len(grpcBuilder.CollectorHostPorts) == 0 {
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.CollectorGRPCHostPort)
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.GRPC.HostPort)
}
agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
builders := map[agentRep.Type]agentApp.CollectorProxyBuilder{
Expand Down
34 changes: 17 additions & 17 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func New(params *CollectorParams) *Collector {
}

// Start the component and underlying dependencies
func (c *Collector) Start(builderOpts *CollectorOptions) error {
func (c *Collector) Start(options *CollectorOptions) error {
handlerBuilder := &SpanHandlerBuilder{
SpanWriter: c.spanWriter,
CollectorOpts: *builderOpts,
CollectorOpts: *options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
}
Expand All @@ -96,24 +96,24 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
HostPort: options.GRPC.HostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLSGRPC,
TLSConfig: options.GRPC.TLS,
SamplingStore: c.strategyStore,
Logger: c.logger,
MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength,
MaxConnectionAge: builderOpts.CollectorGRPCMaxConnectionAge,
MaxConnectionAgeGrace: builderOpts.CollectorGRPCMaxConnectionAgeGrace,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
}
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: builderOpts.CollectorHTTPHostPort,
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: builderOpts.TLSHTTP,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Expand All @@ -124,16 +124,16 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
}
c.hServer = httpServer

c.tlsGRPCCertWatcherCloser = &builderOpts.TLSGRPC
c.tlsHTTPCertWatcherCloser = &builderOpts.TLSHTTP
c.tlsZipkinCertWatcherCloser = &builderOpts.TLSZipkin
c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS
c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS
c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS
zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
HostPort: builderOpts.CollectorZipkinHTTPHostPort,
HostPort: options.Zipkin.HTTPHostPort,
Handler: c.spanHandlers.ZipkinSpansHandler,
TLSConfig: builderOpts.TLSZipkin,
TLSConfig: options.Zipkin.TLS,
HealthCheck: c.hCheck,
AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders,
AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins,
AllowedHeaders: options.Zipkin.AllowedHeaders,
AllowedOrigins: options.Zipkin.AllowedOrigins,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
})
Expand All @@ -142,7 +142,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
}
c.zkServer = zkServer

c.publishOpts(builderOpts)
c.publishOpts(options)

return nil
}
Expand Down
87 changes: 48 additions & 39 deletions cmd/collector/app/builder_flags.go → cmd/collector/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,41 @@ type CollectorOptions struct {
QueueSize int
// NumWorkers is the number of internal workers in a collector
NumWorkers int
// CollectorHTTPHostPort is the host:port address that the collector service listens in on for http requests
CollectorHTTPHostPort string
// CollectorGRPCHostPort is the host:port address that the collector service listens in on for gRPC requests
CollectorGRPCHostPort string
// TLSGRPC configures secure transport for gRPC endpoint to collect spans
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport for HTTP endpoint to collect spans
TLSHTTP tlscfg.Options
// TLSZipkin configures secure transport for Zipkin endpoint to collect spans
TLSZipkin tlscfg.Options
// HTTP section defines options for HTTP server
HTTP struct {
// HostPort is the host:port address that the collector service listens in on for http requests
HostPort string
// TLS configures secure transport for HTTP endpoint to collect spans
TLS tlscfg.Options
}
// GRPC section defines options for gRPC server
GRPC struct {
// HostPort is the host:port address that the collector service listens in on for gRPC requests
HostPort string
// TLS configures secure transport for gRPC endpoint to collect spans
TLS tlscfg.Options
// MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
MaxReceiveMessageLength int
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist.
// See gRPC's keepalive.ServerParameters#MaxConnectionAge.
MaxConnectionAge time.Duration
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
HTTPHostPort string
// ZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from
AllowedOrigins string
// ZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests
AllowedHeaders string
// TLS configures secure transport for Zipkin endpoint to collect spans
TLS tlscfg.Options
}
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// CollectorZipkinHTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
CollectorZipkinHTTPHostPort string
// CollectorZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from
CollectorZipkinAllowedOrigins string
// CollectorZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests
CollectorZipkinAllowedHeaders string
// CollectorGRPCMaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
CollectorGRPCMaxReceiveMessageLength int
// CollectorGRPCMaxConnectionAge is a duration for the maximum amount of time a connection may exist.
// See gRPC's keepalive.ServerParameters#MaxConnectionAge.
CollectorGRPCMaxConnectionAge time.Duration
// CollectorGRPCMaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
CollectorGRPCMaxConnectionAgeGrace time.Duration
}

// AddFlags adds flags for CollectorOptions
Expand All @@ -112,33 +121,33 @@ func AddFlags(flags *flag.FlagSet) {

// InitFromViper initializes CollectorOptions with properties from viper
func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) (*CollectorOptions, error) {
cOpts.CollectorGRPCHostPort = ports.FormatHostPort(v.GetString(collectorGRPCHostPort))
cOpts.CollectorHTTPHostPort = ports.FormatHostPort(v.GetString(collectorHTTPHostPort))
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.CollectorZipkinHTTPHostPort = ports.FormatHostPort(v.GetString(collectorZipkinHTTPHostPort))
cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.QueueSize = v.GetInt(collectorQueueSize)
cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength)
cOpts.CollectorGRPCMaxConnectionAge = v.GetDuration(collectorMaxConnectionAge)
cOpts.CollectorGRPCMaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace)
cOpts.GRPC.HostPort = ports.FormatHostPort(v.GetString(collectorGRPCHostPort))
cOpts.GRPC.MaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength)
cOpts.GRPC.MaxConnectionAge = v.GetDuration(collectorMaxConnectionAge)
cOpts.GRPC.MaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace)
if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil {
cOpts.TLSGRPC = tlsGrpc
cOpts.GRPC.TLS = tlsGrpc
} else {
return cOpts, fmt.Errorf("failed to parse gRPC TLS options: %w", err)
}
cOpts.HTTP.HostPort = ports.FormatHostPort(v.GetString(collectorHTTPHostPort))
if tlsHTTP, err := tlsHTTPFlagsConfig.InitFromViper(v); err == nil {
cOpts.TLSHTTP = tlsHTTP
cOpts.HTTP.TLS = tlsHTTP
} else {
return cOpts, fmt.Errorf("failed to parse HTTP TLS options: %w", err)
}
cOpts.Zipkin.AllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
cOpts.Zipkin.AllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(collectorZipkinHTTPHostPort))
if tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v); err == nil {
cOpts.TLSZipkin = tlsZipkin
cOpts.Zipkin.TLS = tlsZipkin
} else {
return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err)
}
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.QueueSize = v.GetInt(collectorQueueSize)

return cOpts, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func TestCollectorOptionsWithFlags_CheckHostPort(t *testing.T) {
})
c.InitFromViper(v)

assert.Equal(t, ":5678", c.CollectorHTTPHostPort)
assert.Equal(t, ":1234", c.CollectorGRPCHostPort)
assert.Equal(t, ":3456", c.CollectorZipkinHTTPHostPort)
assert.Equal(t, ":5678", c.HTTP.HostPort)
assert.Equal(t, ":1234", c.GRPC.HostPort)
assert.Equal(t, ":3456", c.Zipkin.HTTPHostPort)
}

func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) {
Expand All @@ -49,9 +49,9 @@ func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) {
})
c.InitFromViper(v)

assert.Equal(t, ":5678", c.CollectorHTTPHostPort)
assert.Equal(t, "127.0.0.1:1234", c.CollectorGRPCHostPort)
assert.Equal(t, "0.0.0.0:3456", c.CollectorZipkinHTTPHostPort)
assert.Equal(t, ":5678", c.HTTP.HostPort)
assert.Equal(t, "127.0.0.1:1234", c.GRPC.HostPort)
assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort)
}

func TestCollectorOptionsWithFailedHTTPFlags(t *testing.T) {
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) {
})
c.InitFromViper(v)

assert.Equal(t, 8388608, c.CollectorGRPCMaxReceiveMessageLength)
assert.Equal(t, 8388608, c.GRPC.MaxReceiveMessageLength)
}

func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) {
Expand All @@ -113,6 +113,6 @@ func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) {
})
c.InitFromViper(v)

assert.Equal(t, 5*time.Minute, c.CollectorGRPCMaxConnectionAge)
assert.Equal(t, time.Minute, c.CollectorGRPCMaxConnectionAgeGrace)
assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge)
assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ require (
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/xdg-go/scram v1.1.1
go.opentelemetry.io/collector/pdata v0.51.0
go.opentelemetry.io/collector/semconv v0.51.0
go.opentelemetry.io/collector/pdata v0.52.0
go.opentelemetry.io/collector/semconv v0.52.0
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.5.1
go.uber.org/zap v1.21.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -676,10 +676,10 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/collector/pdata v0.51.0 h1:J5CCnrg1iGOA1CfFOH+wAkMlJ1vjcm677kuPp18mbko=
go.opentelemetry.io/collector/pdata v0.51.0/go.mod h1:FsowYKNmf8CgsHgOfJv8V3KjALmy6FYQRHtXAOY3fho=
go.opentelemetry.io/collector/semconv v0.51.0 h1:Almgp3RZKDZNIp5491LfQ/oF2GG5gPK14IghjUxgkPg=
go.opentelemetry.io/collector/semconv v0.51.0/go.mod h1:SxK0rUnUP7YeDakexzbE/vhimTOHwE6m/4aKKd9e27Q=
go.opentelemetry.io/collector/pdata v0.52.0 h1:B0L9fkqKq5xRKFjICK9i11PRyTR52CCYSpTWaynf1Qc=
go.opentelemetry.io/collector/pdata v0.52.0/go.mod h1:GJUTfTv8mlYpHRjcmHXVbvJr48EW/q/P/HuBvpXAE58=
go.opentelemetry.io/collector/semconv v0.52.0 h1:ogYkQ6WL01xQ4GGSwWQejNTQwy/Pwcd6jCKFLSd7svA=
go.opentelemetry.io/collector/semconv v0.52.0/go.mod h1:SxK0rUnUP7YeDakexzbE/vhimTOHwE6m/4aKKd9e27Q=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down