Skip to content

Commit

Permalink
Make agent reporters configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Oct 2, 2018
1 parent d73db1e commit 2a11e96
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 98 deletions.
2 changes: 2 additions & 0 deletions cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

func TestAgentStartError(t *testing.T) {
cfg := &Builder{}
configureSamplingManager(t, cfg)
agent, err := cfg.CreateAgent(zap.NewNop())
require.NoError(t, err)
agent.httpServer.Addr = "bad-address"
Expand Down Expand Up @@ -100,6 +101,7 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) {
},
}
logger, logBuf := testutils.NewLogger()
configureSamplingManager(t, &cfg)
agent, err := cfg.CreateAgent(logger)
require.NoError(t, err)
ch := make(chan error, 2)
Expand Down
77 changes: 41 additions & 36 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ package app
import (
"fmt"
"net/http"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/processors"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
tchreporter "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers"
"github.com/jaegertracing/jaeger/cmd/agent/app/servers/thriftudp"
jmetrics "github.com/jaegertracing/jaeger/pkg/metrics"
Expand All @@ -37,11 +34,9 @@ import (
)

const (
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultServerWorkers = 10
defaultMinPeers = 3
defaultConnCheckTimeout = 250 * time.Millisecond
defaultQueueSize = 1000
defaultMaxPacketSize = 65000
defaultServerWorkers = 10

defaultHTTPServerHostPort = ":5778"

Expand Down Expand Up @@ -73,10 +68,9 @@ type Builder struct {
HTTPServer HTTPServerConfiguration `yaml:"httpServer"`
Metrics jmetrics.Builder `yaml:"metrics"`

tchreporter.Builder `yaml:",inline"`

otherReporters []reporter.Reporter
reporters []reporter.Reporter
metricsFactory metrics.Factory
configManager httpserver.ClientConfigManager
}

// ProcessorConfiguration holds config for a processor that receives spans from Server
Expand All @@ -99,9 +93,9 @@ type HTTPServerConfiguration struct {
HostPort string `yaml:"hostPort" validate:"nonzero"`
}

// WithReporter adds auxiliary reporters.
func (b *Builder) WithReporter(r reporter.Reporter) *Builder {
b.otherReporters = append(b.otherReporters, r)
// WithReporters adds auxiliary reporters.
func (b *Builder) WithReporters(r ...reporter.Reporter) *Builder {
b.reporters = append(b.reporters, r...)
return b
}

Expand All @@ -111,47 +105,46 @@ func (b *Builder) WithMetricsFactory(mf metrics.Factory) *Builder {
return b
}

func (b *Builder) createMainReporter(mFactory metrics.Factory, logger *zap.Logger) (*tchreporter.Reporter, error) {
return b.CreateReporter(mFactory, logger)
}

func (b *Builder) getMetricsFactory() (metrics.Factory, error) {
// GetMetricsFactory returns metrics factory used by the agent.
func (b *Builder) GetMetricsFactory() (metrics.Factory, error) {
if b.metricsFactory != nil {
return b.metricsFactory, nil
}

baseFactory, err := b.Metrics.CreateMetricsFactory("jaeger")
fmt.Println("creating metrics factory")
if err != nil {
return nil, err
}

return baseFactory.Namespace("agent", nil), nil
fmt.Println("creating metrics factory")
b.metricsFactory = baseFactory.Namespace("agent", nil)
return b.metricsFactory, nil
}

// CreateAgent creates the Agent
func (b *Builder) CreateAgent(logger *zap.Logger) (*Agent, error) {
mFactory, err := b.getMetricsFactory()
mFactory, err := b.GetMetricsFactory()
if err != nil {
return nil, errors.Wrap(err, "cannot create metrics factory")
}
mainReporter, err := b.createMainReporter(mFactory, logger)

if err != nil {
return nil, errors.Wrap(err, "cannot create main Reporter")
}
var rep reporter.Reporter = mainReporter
if len(b.otherReporters) > 0 {
reps := append([]reporter.Reporter{mainReporter}, b.otherReporters...)
rep = reporter.NewMultiReporter(reps...)
return nil, err
}
processors, err := b.GetProcessors(rep, mFactory, logger)
processors, err := b.GetProcessors(b.getReporter(logger), mFactory, logger)
if err != nil {
return nil, err
}
httpServer := b.HTTPServer.GetHTTPServer(b.CollectorServiceName, mainReporter.Channel(), mFactory)
if h := b.Metrics.Handler(); mFactory != nil && h != nil {
httpServer.Handler.(*http.ServeMux).Handle(b.Metrics.HTTPRoute, h)
server, err := b.HTTPServer.getHTTPServer(b.configManager, mFactory, &b.Metrics)
if err != nil {
return nil, err
}
return NewAgent(processors, httpServer, logger), nil
return NewAgent(processors, server, logger), nil
}

func (b *Builder) getReporter(logger *zap.Logger) reporter.Reporter {
return reporter.NewMultiReporter(b.reporters...)
}

// GetProcessors creates Processors with attached Reporter
Expand Down Expand Up @@ -184,13 +177,25 @@ func (b *Builder) GetProcessors(rep reporter.Reporter, mFactory metrics.Factory,
return retMe, nil
}

// WithClientConfigManager adds configuration manager.
func (b *Builder) WithClientConfigManager(manager httpserver.ClientConfigManager) *Builder {
b.configManager = manager
return b
}

// GetHTTPServer creates an HTTP server that provides sampling strategies and baggage restrictions to client libraries.
func (c HTTPServerConfiguration) GetHTTPServer(svc string, channel *tchannel.Channel, mFactory metrics.Factory) *http.Server {
mgr := httpserver.NewCollectorProxy(svc, channel, mFactory)
func (c HTTPServerConfiguration) getHTTPServer(manager httpserver.ClientConfigManager, mFactory metrics.Factory, mBuilder *jmetrics.Builder) (*http.Server, error) {
if manager == nil {
return nil, errors.New("Http manager is null")
}
if c.HostPort == "" {
c.HostPort = defaultHTTPServerHostPort
}
return httpserver.NewHTTPServer(c.HostPort, mgr, mFactory)
server := httpserver.NewHTTPServer(c.HostPort, manager, mFactory)
if h := mBuilder.Handler(); mFactory != nil && h != nil {
server.Handler.(*http.ServeMux).Handle(mBuilder.HTTPRoute, h)
}
return server, nil
}

// GetThriftProcessor gets a TBufferedServer backed Processor using the collector configuration
Expand Down
46 changes: 14 additions & 32 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package app

import (
"errors"
"strings"
"testing"

Expand All @@ -25,6 +24,8 @@ import (
"go.uber.org/zap"
"gopkg.in/yaml.v2"

"github.com/jaegertracing/jaeger/cmd/agent/app/httpserver"
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)
Expand All @@ -51,14 +52,6 @@ processors:
httpServer:
hostPort: 4.4.4.4:5778
collectorHostPorts:
- 127.0.0.1:14267
- 127.0.0.1:14268
- 127.0.0.1:14269
collectorServiceName: some-collector-service
minPeers: 4
`

func TestBuilderFromConfig(t *testing.T) {
Expand Down Expand Up @@ -101,18 +94,12 @@ func TestBuilderFromConfig(t *testing.T) {
},
}, cfg.Processors[2])
assert.Equal(t, "4.4.4.4:5778", cfg.HTTPServer.HostPort)

assert.Equal(t, 4, cfg.DiscoveryMinPeers)
assert.Equal(t, "some-collector-service", cfg.CollectorServiceName)
assert.Equal(
t,
[]string{"127.0.0.1:14267", "127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
}

func TestBuilderWithExtraReporter(t *testing.T) {
cfg := &Builder{}
cfg.WithReporter(fakeReporter{})
configureSamplingManager(t, cfg)
cfg.WithReporters(fakeReporter{})
agent, err := cfg.CreateAgent(zap.NewNop())
assert.NoError(t, err)
assert.NotNil(t, agent)
Expand All @@ -121,13 +108,14 @@ func TestBuilderWithExtraReporter(t *testing.T) {
func TestBuilderMetrics(t *testing.T) {
mf := metrics.NullFactory
b := new(Builder).WithMetricsFactory(mf)
mf2, err := b.getMetricsFactory()
mf2, err := b.GetMetricsFactory()
assert.NoError(t, err)
assert.Equal(t, mf, mf2)
}

func TestBuilderMetricsHandler(t *testing.T) {
b := &Builder{}
configureSamplingManager(t, b)
b.Metrics.Backend = "expvar"
b.Metrics.HTTPRoute = "/expvar"
factory, err := b.Metrics.CreateMetricsFactory("test")
Expand All @@ -146,14 +134,6 @@ func TestBuilderMetricsError(t *testing.T) {
assert.EqualError(t, err, "cannot create metrics factory: unknown metrics backend specified")
}

func TestBuilderWithDiscoveryError(t *testing.T) {
cfg := &Builder{}
cfg.WithDiscoverer(fakeDiscoverer{})
agent, err := cfg.CreateAgent(zap.NewNop())
assert.EqualError(t, err, "cannot create main Reporter: cannot enable service discovery: both discovery.Discoverer and discovery.Notifier must be specified")
assert.Nil(t, agent)
}

func TestBuilderWithProcessorErrors(t *testing.T) {
testCases := []struct {
model Model
Expand Down Expand Up @@ -190,6 +170,14 @@ func TestBuilderWithProcessorErrors(t *testing.T) {
}
}

func configureSamplingManager(t *testing.T, cfg *Builder) {
m, err := cfg.GetMetricsFactory()
require.NoError(t, err)
r, err := tchannel.NewBuilder().CreateReporter(m, zap.NewNop())
require.NoError(t, err)
cfg.WithReporters(r).WithClientConfigManager(httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), m))
}

type fakeReporter struct{}

func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
Expand All @@ -199,9 +187,3 @@ func (fr fakeReporter) EmitZipkinBatch(spans []*zipkincore.Span) (err error) {
func (fr fakeReporter) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
}

type fakeDiscoverer struct{}

func (fd fakeDiscoverer) Instances() ([]string, error) {
return nil, errors.New("discoverer error")
}
21 changes: 0 additions & 21 deletions cmd/agent/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package app
import (
"flag"
"fmt"
"strings"

"github.com/spf13/viper"
)
Expand All @@ -27,10 +26,7 @@ const (
suffixServerQueueSize = "server-queue-size"
suffixServerMaxPacketSize = "server-max-packet-size"
suffixServerHostPort = "server-host-port"
collectorHostPort = "collector.host-port"
httpServerHostPort = "http-server.host-port"
discoveryMinPeers = "discovery.min-peers"
discoveryConnCheckTimeout = "discovery.conn-check-timeout"
)

var defaultProcessors = []struct {
Expand All @@ -52,22 +48,10 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(prefix+suffixServerMaxPacketSize, defaultMaxPacketSize, "max packet size for the UDP server")
flags.String(prefix+suffixServerHostPort, processor.hostPort, "host:port for the UDP server")
}
flags.String(
collectorHostPort,
"",
"comma-separated string representing host:ports of a static list of collectors to connect to directly (e.g. when not using service discovery)")
flags.String(
httpServerHostPort,
defaultHTTPServerHostPort,
"host:port of the http server (e.g. for /sampling point and /baggage endpoint)")
flags.Int(
discoveryMinPeers,
defaultMinPeers,
"if using service discovery, the min number of connections to maintain to the backend")
flags.Duration(
discoveryConnCheckTimeout,
defaultConnCheckTimeout,
"sets the timeout used when establishing new connections")
}

// InitFromViper initializes Builder with properties retrieved from Viper.
Expand All @@ -84,11 +68,6 @@ func (b *Builder) InitFromViper(v *viper.Viper) *Builder {
b.Processors = append(b.Processors, *p)
}

if len(v.GetString(collectorHostPort)) > 0 {
b.CollectorHostPorts = strings.Split(v.GetString(collectorHostPort), ",")
}
b.HTTPServer.HostPort = v.GetString(httpServerHostPort)
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
b.ConnCheckTimeout = v.GetDuration(discoveryConnCheckTimeout)
return b
}
4 changes: 0 additions & 4 deletions cmd/agent/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ func TestBingFlags(t *testing.T) {
v.BindPFlags(command.PersistentFlags())

err := command.ParseFlags([]string{
"--collector.host-port=1.2.3.4:555,1.2.3.4:666",
"--discovery.min-peers=42",
"--http-server.host-port=:8080",
"--processor.jaeger-binary.server-host-port=:1111",
"--processor.jaeger-binary.server-max-packet-size=4242",
Expand All @@ -46,8 +44,6 @@ func TestBingFlags(t *testing.T) {

b.InitFromViper(v)
assert.Equal(t, 3, len(b.Processors))
assert.Equal(t, []string{"1.2.3.4:555", "1.2.3.4:666"}, b.CollectorHostPorts)
assert.Equal(t, 42, b.DiscoveryMinPeers)
assert.Equal(t, ":8080", b.HTTPServer.HostPort)
assert.Equal(t, ":1111", b.Processors[2].Server.HostPort)
assert.Equal(t, 4242, b.Processors[2].Server.MaxPacketSize)
Expand Down
Loading

0 comments on commit 2a11e96

Please sign in to comment.