Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed May 26, 2022
1 parent 1c68ad5 commit 54417be
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ by default uses only in-memory database.`,
// agent
// if the agent reporter grpc host:port was not explicitly set then use whatever the collector is listening on
if len(grpcBuilder.CollectorHostPorts) == 0 {
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.GRPCHostPort)
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.GRPC.HostPort)
}
agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
builders := map[agentRep.Type]agentApp.CollectorProxyBuilder{
Expand Down
28 changes: 14 additions & 14 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,24 @@ func (c *Collector) Start(options *CollectorOptions) error {
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: options.GRPCHostPort,
HostPort: options.GRPC.HostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: options.TLSGRPC,
TLSConfig: options.GRPC.TLS,
SamplingStore: c.strategyStore,
Logger: c.logger,
MaxReceiveMessageLength: options.GRPCMaxReceiveMessageLength,
MaxConnectionAge: options.GRPCMaxConnectionAge,
MaxConnectionAgeGrace: options.GRPCMaxConnectionAgeGrace,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
}
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: options.HTTPHostPort,
HostPort: options.HTTP.HostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.TLSHTTP,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Expand All @@ -124,16 +124,16 @@ func (c *Collector) Start(options *CollectorOptions) error {
}
c.hServer = httpServer

c.tlsGRPCCertWatcherCloser = &options.TLSGRPC
c.tlsHTTPCertWatcherCloser = &options.TLSHTTP
c.tlsZipkinCertWatcherCloser = &options.TLSZipkin
c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS
c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS
c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS
zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
HostPort: options.ZipkinHTTPHostPort,
HostPort: options.Zipkin.HTTPHostPort,
Handler: c.spanHandlers.ZipkinSpansHandler,
TLSConfig: options.TLSZipkin,
TLSConfig: options.Zipkin.TLS,
HealthCheck: c.hCheck,
AllowedHeaders: options.ZipkinAllowedHeaders,
AllowedOrigins: options.ZipkinAllowedOrigins,
AllowedHeaders: options.Zipkin.AllowedHeaders,
AllowedOrigins: options.Zipkin.AllowedOrigins,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
})
Expand Down
153 changes: 153 additions & 0 deletions cmd/collector/app/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (c) 2019 The Jaeger Authors.
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"flag"
"fmt"
"time"

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/cmd/flags"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/ports"
)

const (
collectorDynQueueSizeMemory = "collector.queue-size-memory"
collectorGRPCHostPort = "collector.grpc-server.host-port"
collectorHTTPHostPort = "collector.http-server.host-port"
collectorNumWorkers = "collector.num-workers"
collectorQueueSize = "collector.queue-size"
collectorTags = "collector.tags"
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
collectorZipkinAllowedOrigins = "collector.zipkin.allowed-origins"
collectorZipkinHTTPHostPort = "collector.zipkin.host-port"
collectorGRPCMaxReceiveMessageLength = "collector.grpc-server.max-message-size"
collectorMaxConnectionAge = "collector.grpc-server.max-connection-age"
collectorMaxConnectionAgeGrace = "collector.grpc-server.max-connection-age-grace"
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.grpc",
}

var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.http",
}

var tlsZipkinFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.zipkin",
}

// CollectorOptions holds configuration for collector
type CollectorOptions struct {
// DynQueueSizeMemory determines how much memory to use for the queue
DynQueueSizeMemory uint
// QueueSize is the size of collector's queue
QueueSize int
// NumWorkers is the number of internal workers in a collector
NumWorkers int
// HTTP section defines options for HTTP server
HTTP struct {
// HostPort is the host:port address that the collector service listens in on for http requests
HostPort string
// TLS configures secure transport for HTTP endpoint to collect spans
TLS tlscfg.Options
}
// GRPC section defines options for gRPC server
GRPC struct {
// HostPort is the host:port address that the collector service listens in on for gRPC requests
HostPort string
// TLS configures secure transport for gRPC endpoint to collect spans
TLS tlscfg.Options
// MaxReceiveMessageLength is the maximum message size receivable by the gRPC Collector.
MaxReceiveMessageLength int
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist.
// See gRPC's keepalive.ServerParameters#MaxConnectionAge.
MaxConnectionAge time.Duration
// MaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
HTTPHostPort string
// ZipkinAllowedOrigins is a list of origins a cross-domain request to the Zipkin collector service can be executed from
AllowedOrigins string
// ZipkinAllowedHeaders is a list of headers that the Zipkin collector service allowes the client to use with cross-domain requests
AllowedHeaders string
// TLS configures secure transport for Zipkin endpoint to collect spans
TLS tlscfg.Options
}
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
}

// AddFlags adds flags for CollectorOptions
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorNumWorkers, DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorQueueSize, DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorGRPCMaxReceiveMessageLength, DefaultGRPCMaxReceiveMessageLength, "The maximum receivable message size for the collector's GRPC server")
flags.String(collectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server")
flags.String(collectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server")
flags.String(collectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
flags.String(collectorZipkinAllowedHeaders, "content-type", "Comma separated list of allowed headers for the Zipkin collector service, default content-type")
flags.String(collectorZipkinAllowedOrigins, "*", "Comma separated list of allowed origins for the Zipkin collector service, default accepts all")
flags.String(collectorZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)")
flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.")
flags.Duration(collectorMaxConnectionAge, 0, "The maximum amount of time a connection may exist. Set this value to a few seconds or minutes on highly elastic environments, so that clients discover new collector nodes frequently. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters")
flags.Duration(collectorMaxConnectionAgeGrace, 0, "The additive period after MaxConnectionAge after which the connection will be forcibly closed. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters")

tlsGRPCFlagsConfig.AddFlags(flags)
tlsHTTPFlagsConfig.AddFlags(flags)
tlsZipkinFlagsConfig.AddFlags(flags)
}

// InitFromViper initializes CollectorOptions with properties from viper
func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) (*CollectorOptions, error) {
cOpts.GRPC.HostPort = ports.FormatHostPort(v.GetString(collectorGRPCHostPort))
cOpts.GRPC.MaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength)
cOpts.GRPC.MaxConnectionAge = v.GetDuration(collectorMaxConnectionAge)
cOpts.GRPC.MaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace)
if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil {
cOpts.GRPC.TLS = tlsGrpc
} else {
return cOpts, fmt.Errorf("failed to parse gRPC TLS options: %w", err)
}
cOpts.HTTP.HostPort = ports.FormatHostPort(v.GetString(collectorHTTPHostPort))
if tlsHTTP, err := tlsHTTPFlagsConfig.InitFromViper(v); err == nil {
cOpts.HTTP.TLS = tlsHTTP
} else {
return cOpts, fmt.Errorf("failed to parse HTTP TLS options: %w", err)
}
cOpts.Zipkin.AllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
cOpts.Zipkin.AllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.Zipkin.HTTPHostPort = ports.FormatHostPort(v.GetString(collectorZipkinHTTPHostPort))
if tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v); err == nil {
cOpts.Zipkin.TLS = tlsZipkin
} else {
return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err)
}
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.DynQueueSizeMemory = v.GetUint(collectorDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.QueueSize = v.GetInt(collectorQueueSize)

return cOpts, nil
}
118 changes: 118 additions & 0 deletions cmd/collector/app/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/pkg/config"
)

func TestCollectorOptionsWithFlags_CheckHostPort(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--collector.http-server.host-port=5678",
"--collector.grpc-server.host-port=1234",
"--collector.zipkin.host-port=3456",
})
c.InitFromViper(v)

assert.Equal(t, ":5678", c.HTTP.HostPort)
assert.Equal(t, ":1234", c.GRPC.HostPort)
assert.Equal(t, ":3456", c.Zipkin.HTTPHostPort)
}

func TestCollectorOptionsWithFlags_CheckFullHostPort(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--collector.http-server.host-port=:5678",
"--collector.grpc-server.host-port=127.0.0.1:1234",
"--collector.zipkin.host-port=0.0.0.0:3456",
})
c.InitFromViper(v)

assert.Equal(t, ":5678", c.HTTP.HostPort)
assert.Equal(t, "127.0.0.1:1234", c.GRPC.HostPort)
assert.Equal(t, "0.0.0.0:3456", c.Zipkin.HTTPHostPort)
}

func TestCollectorOptionsWithFailedHTTPFlags(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--collector.http.tls.enabled=false",
"--collector.http.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse HTTP TLS options")
}

func TestCollectorOptionsWithFailedGRPCFlags(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--collector.grpc.tls.enabled=false",
"--collector.grpc.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse gRPC TLS options")
}

func TestCollectorOptionsWithFailedZipkinFlags(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags([]string{
"--collector.zipkin.tls.enabled=false",
"--collector.zipkin.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = c.InitFromViper(v)
require.Error(t, err)
assert.Contains(t, err.Error(), "failed to parse Zipkin TLS options")
}

func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--collector.grpc-server.max-message-size=8388608",
})
c.InitFromViper(v)

assert.Equal(t, 8388608, c.GRPC.MaxReceiveMessageLength)
}

func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) {
c := &CollectorOptions{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--collector.grpc-server.max-connection-age=5m",
"--collector.grpc-server.max-connection-age-grace=1m",
})
c.InitFromViper(v)

assert.Equal(t, 5*time.Minute, c.GRPC.MaxConnectionAge)
assert.Equal(t, time.Minute, c.GRPC.MaxConnectionAgeGrace)
}

0 comments on commit 54417be

Please sign in to comment.