Skip to content

Commit

Permalink
[chore][routingprocessor] Make exporters registration more generic.
Browse files Browse the repository at this point in the history
  • Loading branch information
kovrus committed Aug 23, 2022
1 parent 74e0dd3 commit 712240e
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 221 deletions.
48 changes: 38 additions & 10 deletions processor/routingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ var (

type processorImp struct {
logger *zap.Logger
router *router

metricsRouter router[component.MetricsExporter]
logsRouter router[component.LogsExporter]
tracesRouter router[component.TracesExporter]
}

// newProcessor creates new processor
Expand All @@ -55,25 +58,50 @@ func newProcessor(logger *zap.Logger, cfg config.Processor) *processorImp {
oCfg := cfg.(*Config)

return &processorImp{
logger: logger,
router: newRouter(*oCfg, logger),
logger: logger,
metricsRouter: newRouter[component.MetricsExporter](*oCfg, logger),
logsRouter: newRouter[component.LogsExporter](*oCfg, logger),
tracesRouter: newRouter[component.TracesExporter](*oCfg, logger),
}
}

func (e *processorImp) Start(_ context.Context, host component.Host) error {
return e.router.registerExporters(host.GetExporters())
exporters := host.GetExporters()

err := e.metricsRouter.RegisterExportersForType(exporters, config.MetricsDataType)
if err != nil {
return err
}
err = e.logsRouter.RegisterExportersForType(exporters, config.LogsDataType)
if err != nil {
return err
}
err = e.tracesRouter.RegisterExportersForType(exporters, config.TracesDataType)
if err != nil {
return err
}
if len(e.tracesRouter.exporters) == 0 &&
len(e.tracesRouter.defaultExporters) == 0 &&
len(e.metricsRouter.exporters) == 0 &&
len(e.metricsRouter.defaultExporters) == 0 &&
len(e.logsRouter.exporters) == 0 &&
len(e.logsRouter.defaultExporters) == 0 {
return errNoExportersAfterRegistration
}

return nil
}

func (e *processorImp) Shutdown(context.Context) error {
return nil
}

func (e *processorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
routedTraces := e.router.RouteTraces(ctx, td)
routedTraces := e.tracesRouter.RouteTraces(ctx, td)
for _, rt := range routedTraces {
for _, exp := range rt.exporters {
// TODO: determine the proper action when errors happen
if err := exp.ConsumeTraces(ctx, rt.traces); err != nil {
if err := exp.ConsumeTraces(ctx, rt.signal); err != nil {
return err
}
}
Expand All @@ -83,11 +111,11 @@ func (e *processorImp) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
}

func (e *processorImp) ConsumeMetrics(ctx context.Context, tm pmetric.Metrics) error {
routedMetrics := e.router.RouteMetrics(ctx, tm)
routedMetrics := e.metricsRouter.RouteMetrics(ctx, tm)
for _, rm := range routedMetrics {
for _, exp := range rm.exporters {
// TODO: determine the proper action when errors happen
if err := exp.ConsumeMetrics(ctx, rm.metrics); err != nil {
if err := exp.ConsumeMetrics(ctx, rm.signal); err != nil {
return err
}
}
Expand All @@ -97,11 +125,11 @@ func (e *processorImp) ConsumeMetrics(ctx context.Context, tm pmetric.Metrics) e
}

func (e *processorImp) ConsumeLogs(ctx context.Context, tl plog.Logs) error {
routedLogs := e.router.RouteLogs(ctx, tl)
routedLogs := e.logsRouter.RouteLogs(ctx, tl)
for _, rl := range routedLogs {
for _, exp := range rl.exporters {
// TODO: determine the proper action when errors happen
if err := exp.ConsumeLogs(ctx, rl.logs); err != nil {
if err := exp.ConsumeLogs(ctx, rl.signal); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion processor/routingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestTraces_RegisterExportersForValidRoute(t *testing.T) {
require.NoError(t, exp.Start(context.Background(), host))

// verify
assert.Contains(t, exp.router.tracesExporters["acme"], otlpExp)
assert.Contains(t, exp.tracesRouter.exporters["acme"], otlpExp)
}

func TestTraces_ErrorRequestedExporterNotFoundForRoute(t *testing.T) {
Expand Down
Loading

0 comments on commit 712240e

Please sign in to comment.