Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix move otel to metrics and traces register #854

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 6 additions & 42 deletions backend_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package krakend

import (
"context"
"errors"
"fmt"

otelconfig "github.com/krakend/krakend-otel/config"
otellura "github.com/krakend/krakend-otel/lura"
otelstate "github.com/krakend/krakend-otel/state"
amqp "github.com/krakendio/krakend-amqp/v2"
cel "github.com/krakendio/krakend-cel/v2"
cb "github.com/krakendio/krakend-circuitbreaker/v2/gobreaker/proxy"
Expand Down Expand Up @@ -43,7 +40,7 @@ func NewBackendFactory(logger logging.Logger, metricCollector *metrics.Metrics)
return NewBackendFactoryWithContext(context.Background(), logger, metricCollector)
}

func newRequestExecutorFactory(logger logging.Logger, otelCfg *otelconfig.Config) func(*config.Backend) client.HTTPRequestExecutor {
func newRequestExecutorFactory(logger logging.Logger) func(*config.Backend) client.HTTPRequestExecutor {
requestExecutorFactory := func(cfg *config.Backend) client.HTTPRequestExecutor {
clientFactory := client.NewHTTPClient
if _, ok := cfg.ExtraConfig[oauth2client.Namespace]; ok {
Expand All @@ -52,18 +49,15 @@ func newRequestExecutorFactory(logger logging.Logger, otelCfg *otelconfig.Config
clientFactory = httpcache.NewHTTPClient(cfg, clientFactory)
}

if otelCfg != nil {
clientFactory = otellura.InstrumentedHTTPClientFactory(clientFactory,
cfg, otelCfg.Layers.Backend, otelCfg.SkipPaths, otelstate.GlobalState)
}
clientFactory = otellura.InstrumentedHTTPClientFactory(clientFactory, cfg)
// TODO: check what happens if we have both, opencensus and otel enabled ?
return opencensus.HTTPRequestExecutorFromConfig(clientFactory, cfg)
}
return httprequestexecutor.HTTPRequestExecutor(logger, requestExecutorFactory)
}

func internalNewBackendFactory(ctx context.Context, requestExecutorFactory func(*config.Backend) client.HTTPRequestExecutor,
logger logging.Logger, metricCollector *metrics.Metrics, otelCfg *otelconfig.Config) proxy.BackendFactory {
logger logging.Logger, metricCollector *metrics.Metrics) proxy.BackendFactory {

backendFactory := martian.NewConfiguredBackendFactory(logger, requestExecutorFactory)
bf := pubsub.NewBackendFactory(ctx, logger, backendFactory)
Expand All @@ -76,12 +70,7 @@ func internalNewBackendFactory(ctx context.Context, requestExecutorFactory func(
backendFactory = cb.BackendFactory(backendFactory, logger)
backendFactory = metricCollector.BackendFactory("backend", backendFactory)
backendFactory = opencensus.BackendFactory(backendFactory)

if otelCfg != nil {
// full backend processing instrumentation:
backendFactory = otellura.BackendFactory(backendFactory, otelstate.GlobalState, otelCfg.Layers.Backend,
otelCfg.SkipPaths)
}
backendFactory = otellura.BackendFactory(backendFactory)
return func(remote *config.Backend) proxy.Proxy {
logger.Debug(fmt.Sprintf("[BACKEND: %s] Building the backend pipe", remote.URLPattern))
return backendFactory(remote)
Expand All @@ -90,37 +79,12 @@ func internalNewBackendFactory(ctx context.Context, requestExecutorFactory func(

// NewBackendFactoryWithContext creates a BackendFactory by stacking all the available middlewares and injecting the received context
func NewBackendFactoryWithContext(ctx context.Context, logger logging.Logger, metricCollector *metrics.Metrics) proxy.BackendFactory {
requestExecutorFactory := newRequestExecutorFactory(logger, nil)
return internalNewBackendFactory(ctx, requestExecutorFactory, logger, metricCollector, nil)
}

func newBackendFactoryWithOTELConfig(ctx context.Context, logger logging.Logger,
metricCollector *metrics.Metrics, otelCfg *otelconfig.Config) proxy.BackendFactory {

if otelCfg == nil {
return NewBackendFactoryWithContext(ctx, logger, metricCollector)
}

requestExecutorFactory := newRequestExecutorFactory(logger, otelCfg)
return internalNewBackendFactory(ctx, requestExecutorFactory, logger, metricCollector, otelCfg)
requestExecutorFactory := newRequestExecutorFactory(logger)
return internalNewBackendFactory(ctx, requestExecutorFactory, logger, metricCollector)
}

type backendFactory struct{}

func (backendFactory) NewBackendFactory(ctx context.Context, l logging.Logger, m *metrics.Metrics) proxy.BackendFactory {
return NewBackendFactoryWithContext(ctx, l, m)
}

func (backendFactory) NewBackendFactoryWithConfig(ctx context.Context, l logging.Logger,
metrics *metrics.Metrics, serviceCfg *config.ServiceConfig) proxy.BackendFactory {

var otelCfg *otelconfig.Config
if serviceCfg != nil {
var err error
otelCfg, err = otelconfig.FromLura(*serviceCfg)
if err != nil && !errors.Is(err, otelconfig.ErrNoConfig) {
l.Error(fmt.Sprintf("cannot load OpenTelemetry config: %s", err.Error()))
}
}
return newBackendFactoryWithOTELConfig(ctx, l, metrics, otelCfg)
}
64 changes: 29 additions & 35 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (
"golang.org/x/sync/errgroup"

kotel "github.com/krakend/krakend-otel"
otelconfig "github.com/krakend/krakend-otel/config"
otellura "github.com/krakend/krakend-otel/lura"
otelgin "github.com/krakend/krakend-otel/router/gin"
otelstate "github.com/krakend/krakend-otel/state"
krakendbf "github.com/krakendio/bloomfilter/v2/krakend"
asyncamqp "github.com/krakendio/krakend-amqp/v2/async"
audit "github.com/krakendio/krakend-audit"
Expand Down Expand Up @@ -151,11 +149,12 @@ type ExecutorBuilder struct {
TokenRejecterFactory TokenRejecterFactory
MetricsAndTracesRegister MetricsAndTracesRegister
EngineFactory EngineFactory
ProxyFactory ProxyFactory
BackendFactory BackendFactory
HandlerFactory HandlerFactory
RunServerFactory RunServerFactory
AgentStarterFactory AgentStarter

ProxyFactory ProxyFactory
BackendFactory BackendFactory
HandlerFactory HandlerFactory
RunServerFactory RunServerFactory
AgentStarterFactory AgentStarter

Middlewares []gin.HandlerFunc
}
Expand All @@ -175,12 +174,6 @@ func (e *ExecutorBuilder) NewCmdExecutor(ctx context.Context) cmd.Executor {
return
}

shutdownFn, err := kotel.Register(ctx, logger, cfg)
if err != nil {
logger.Error(fmt.Sprintf("[SERVICE: OpenTelemetry] cannot register exporters: %s", err.Error()))
}
defer shutdownFn()

logger.Info(fmt.Sprintf("Starting KrakenD v%s", core.KrakendVersion))
startReporter(ctx, logger, cfg)

Expand All @@ -193,6 +186,9 @@ func (e *ExecutorBuilder) NewCmdExecutor(ctx context.Context) cmd.Executor {
}

metricCollector := e.MetricsAndTracesRegister.Register(ctx, cfg, logger)
if metricsAndTracesCloser, ok := e.MetricsAndTracesRegister.(io.Closer); ok {
defer metricsAndTracesCloser.Close()
}

// Initializes the global cache for the JWK clients if enabled in the config
if err := jose.SetGlobalCacher(logger, cfg.ExtraConfig); err != nil && err != jose.ErrNoValidatorCfg {
Expand All @@ -208,32 +204,16 @@ func (e *ExecutorBuilder) NewCmdExecutor(ctx context.Context) cmd.Executor {
logger.Warning("[SERVICE: Bloomfilter]", err.Error())
}

var bpf proxy.BackendFactory
if bfwc, ok := e.BackendFactory.(BackendFactoryWithConfig); ok {
bpf = bfwc.NewBackendFactoryWithConfig(ctx, logger, metricCollector, &cfg)
} else {
bpf = e.BackendFactory.NewBackendFactory(ctx, logger, metricCollector)
}

var pf proxy.Factory
if pfwc, ok := e.ProxyFactory.(ProxyFactoryWithConfig); ok {
pf = pfwc.NewProxyFactoryWithConfig(logger, bpf, metricCollector, &cfg)
} else {
pf = e.ProxyFactory.NewProxyFactory(logger, bpf, metricCollector)
}
bpf := e.BackendFactory.NewBackendFactory(ctx, logger, metricCollector)
pf := e.ProxyFactory.NewProxyFactory(logger, bpf, metricCollector)

agentPing := make(chan string, len(cfg.AsyncAgents))

handlerF := e.HandlerFactory.NewHandlerFactory(logger, metricCollector, tokenRejecterFactory)
otelCfg, err := otelconfig.FromLura(cfg)
if err == nil {
handlerF = otelgin.New(handlerF, otelCfg.SkipPaths)
}
handlerF = otelgin.New(handlerF)

runServerChain := serverhttp.RunServerWithLoggerFactory(logger)
if otelCfg != nil {
runServerChain = otellura.GlobalRunServer(logger, otelCfg, otelstate.GlobalState, runServerChain)
}
runServerChain = otellura.GlobalRunServer(logger, runServerChain)
runServerChain = router.RunServerFunc(e.RunServerFactory.NewRunServer(logger, runServerChain))

// setup the krakend router
Expand Down Expand Up @@ -403,10 +383,12 @@ func (BloomFilterJWT) NewTokenRejecter(ctx context.Context, cfg config.ServiceCo
}

// MetricsAndTraces is the default implementation of the MetricsAndTracesRegister interface.
type MetricsAndTraces struct{}
type MetricsAndTraces struct {
shutdownFn func()
}

// Register registers the metrics, influx and opencensus packages as required by the given configuration.
func (MetricsAndTraces) Register(ctx context.Context, cfg config.ServiceConfig, l logging.Logger) *metrics.Metrics {
func (m MetricsAndTraces) Register(ctx context.Context, cfg config.ServiceConfig, l logging.Logger) *metrics.Metrics {
metricCollector := metrics.New(ctx, cfg.ExtraConfig, l)

if err := influxdb.New(ctx, cfg.ExtraConfig, metricCollector, l); err != nil {
Expand All @@ -425,9 +407,21 @@ func (MetricsAndTraces) Register(ctx context.Context, cfg config.ServiceConfig,
l.Debug("[SERVICE: OpenCensus] Service correctly registered")
}

if shutdownFn, err := kotel.Register(ctx, l, cfg); err == nil {
m.shutdownFn = shutdownFn
} else {
l.Error(fmt.Sprintf("[SERVICE: OpenTelemetry] cannot register exporters: %s", err.Error()))
}

return metricCollector
}

func (m MetricsAndTraces) Close() {
if m.shutdownFn != nil {
m.shutdownFn()
}
}

const (
usageDisable = "USAGE_DISABLE"
usageDelay = 5 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.20
require (
github.com/gin-gonic/gin v1.9.1
github.com/go-contrib/uuid v1.2.0
github.com/krakend/krakend-otel v0.1.0
github.com/krakend/krakend-otel v0.1.1-0.20240306204602-74f75bd9b47d
github.com/krakendio/bloomfilter/v2 v2.0.4
github.com/krakendio/krakend-amqp/v2 v2.1.0
github.com/krakendio/krakend-audit v0.0.5
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ github.com/krakend/go-auth0 v1.0.0 h1:dewhsVnquDStTXaRY5OOCL+i4oe+qKbDpaNz9D6Kzu
github.com/krakend/go-auth0 v1.0.0/go.mod h1:lJtS6u8y1mai4lFcE3JC2oSDQaNo2aXexTS0cqTblZU=
github.com/krakend/krakend-otel v0.1.0 h1:tABQ90JQQ+FJ6hL3ZpfDf1zgfkKU+xDCAExM7wAeB3E=
github.com/krakend/krakend-otel v0.1.0/go.mod h1:BHe8HiYJ7d45Rk/wGN0XtBtJBsyeNxUIZF+YMlgorbA=
github.com/krakend/krakend-otel v0.1.1-0.20240305190110-01ffdc62b627 h1:eShXVys6OmHYtxQa+o69jJ12kYllEjlJCRbMw2OaApA=
github.com/krakend/krakend-otel v0.1.1-0.20240305190110-01ffdc62b627/go.mod h1:BHe8HiYJ7d45Rk/wGN0XtBtJBsyeNxUIZF+YMlgorbA=
github.com/krakend/krakend-otel v0.1.1-0.20240306204243-50705443527a h1:q43/zjzjip89L76O2U680z9NTz3kvyKeUnx/1h1M+bg=
github.com/krakend/krakend-otel v0.1.1-0.20240306204243-50705443527a/go.mod h1:BHe8HiYJ7d45Rk/wGN0XtBtJBsyeNxUIZF+YMlgorbA=
github.com/krakend/krakend-otel v0.1.1-0.20240306204602-74f75bd9b47d h1:DD5x8/D/tEW7HA/tRazrFAiHaWhQPifTfRQtyGF7NWY=
github.com/krakend/krakend-otel v0.1.1-0.20240306204602-74f75bd9b47d/go.mod h1:BHe8HiYJ7d45Rk/wGN0XtBtJBsyeNxUIZF+YMlgorbA=
github.com/krakendio/binder v0.0.0-20230413105421-1bbe94e65f45 h1:5UbGH+Sa62LMtbBn6m3EEcaA4JFxaDd91X/X0SsZuO8=
github.com/krakendio/binder v0.0.0-20230413105421-1bbe94e65f45/go.mod h1:VgJK/LM9NwyIxzYETGXQeRxdDg0IgBw4Fol58JVx2+4=
github.com/krakendio/bloomfilter/v2 v2.0.4 h1:+FCe52Izx4mZgFevvifOMF+3WbbhmZT0xZnrnUE4C0U=
Expand Down
40 changes: 4 additions & 36 deletions proxy_factory.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package krakend

import (
"errors"
"fmt"

otelconfig "github.com/krakend/krakend-otel/config"
otellura "github.com/krakend/krakend-otel/lura"
otelstate "github.com/krakend/krakend-otel/state"
cel "github.com/krakendio/krakend-cel/v2"
jsonschema "github.com/krakendio/krakend-jsonschema/v2"
lua "github.com/krakendio/krakend-lua/v2/proxy"
Expand All @@ -17,8 +14,8 @@ import (
"github.com/luraproject/lura/v2/proxy"
)

func newProxyFactoryWithConfig(logger logging.Logger, backendFactory proxy.BackendFactory,
metricCollector *metrics.Metrics, serviceCfg *config.ServiceConfig) proxy.Factory {
func newProxyFactory(logger logging.Logger, backendFactory proxy.BackendFactory,
metricCollector *metrics.Metrics) proxy.Factory {

proxyFactory := proxy.NewDefaultFactory(backendFactory, logger)
proxyFactory = proxy.NewShadowFactory(proxyFactory)
Expand All @@ -27,36 +24,13 @@ func newProxyFactoryWithConfig(logger logging.Logger, backendFactory proxy.Backe
proxyFactory = lua.ProxyFactory(logger, proxyFactory)
proxyFactory = metricCollector.ProxyFactory("pipe", proxyFactory)
proxyFactory = opencensus.ProxyFactory(proxyFactory)

if serviceCfg != nil {
otelCfg, err := otelconfig.FromLura(*serviceCfg)
if err != nil {
if !errors.Is(err, otelconfig.ErrNoConfig) {
logger.Error(fmt.Sprintf("cannot load OpenTelemetry config: %s", err.Error()))
}
} else {
proxyFactory = otellura.ProxyFactory(proxyFactory, otelstate.GlobalState, otelCfg.Layers.Pipe,
otelCfg.SkipPaths)
}
}

proxyFactory = otellura.ProxyFactory(proxyFactory)
return proxyFactory
}

// NewProxyFactory returns a new ProxyFactory wrapping the injected BackendFactory with the default proxy stack and a metrics collector
func NewProxyFactory(logger logging.Logger, backendFactory proxy.BackendFactory, metricCollector *metrics.Metrics) proxy.Factory {
proxyFactory := newProxyFactoryWithConfig(logger, backendFactory, metricCollector, nil)

return proxy.FactoryFunc(func(cfg *config.EndpointConfig) (proxy.Proxy, error) {
logger.Debug(fmt.Sprintf("[ENDPOINT: %s] Building the proxy pipe", cfg.Endpoint))
return proxyFactory.New(cfg)
})
}

func newProxyFactoryWithConfigAndDbg(logger logging.Logger, backendFactory proxy.BackendFactory, metricCollector *metrics.Metrics,
serviceCfg *config.ServiceConfig) proxy.Factory {

proxyFactory := newProxyFactoryWithConfig(logger, backendFactory, metricCollector, serviceCfg)
proxyFactory := newProxyFactory(logger, backendFactory, metricCollector)

return proxy.FactoryFunc(func(cfg *config.EndpointConfig) (proxy.Proxy, error) {
logger.Debug(fmt.Sprintf("[ENDPOINT: %s] Building the proxy pipe", cfg.Endpoint))
Expand All @@ -69,9 +43,3 @@ type proxyFactory struct{}
func (proxyFactory) NewProxyFactory(logger logging.Logger, backendFactory proxy.BackendFactory, metricCollector *metrics.Metrics) proxy.Factory {
return NewProxyFactory(logger, backendFactory, metricCollector)
}

func (proxyFactory) NewProxyFactoryWithConfig(logger logging.Logger, backendFactory proxy.BackendFactory, metricCollector *metrics.Metrics,
serviceCfg *config.ServiceConfig) proxy.Factory {

return newProxyFactoryWithConfigAndDbg(logger, backendFactory, metricCollector, serviceCfg)
}