Skip to content

Commit

Permalink
Fix connector validation based on usage in pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Jun 29, 2023
1 parent c130a0b commit f735a61
Show file tree
Hide file tree
Showing 9 changed files with 475 additions and 284 deletions.
43 changes: 0 additions & 43 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,6 @@ type Factory interface {
LogsToMetricsStability() component.StabilityLevel
LogsToLogsStability() component.StabilityLevel

Stability(exporterType, receiverType component.Type) component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -439,39 +437,6 @@ func (f factory) LogsToLogsStability() component.StabilityLevel {
return f.logsToLogsStabilityLevel
}

func (f factory) Stability(expType, recType component.Type) component.StabilityLevel {
switch expType {
case component.DataTypeTraces:
switch recType {
case component.DataTypeTraces:
return f.tracesToTracesStabilityLevel
case component.DataTypeMetrics:
return f.tracesToMetricsStabilityLevel
case component.DataTypeLogs:
return f.tracesToLogsStabilityLevel
}
case component.DataTypeMetrics:
switch recType {
case component.DataTypeTraces:
return f.metricsToTracesStabilityLevel
case component.DataTypeMetrics:
return f.metricsToMetricsStabilityLevel
case component.DataTypeLogs:
return f.metricsToLogsStabilityLevel
}
case component.DataTypeLogs:
switch recType {
case component.DataTypeTraces:
return f.logsToTracesStabilityLevel
case component.DataTypeMetrics:
return f.logsToMetricsStabilityLevel
case component.DataTypeLogs:
return f.logsToLogsStabilityLevel
}
}
return component.StabilityLevelUndefined
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down Expand Up @@ -508,14 +473,6 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.
return &Builder{cfgs: cfgs, factories: factories}
}

func (b *Builder) SupportsConnection(connType, expType, recType component.Type) bool {
f, exists := b.factories[connType]
if !exists {
return false
}
return f.Stability(expType, recType) != component.StabilityLevelUndefined
}

// CreateTracesToTraces creates a Traces connector based on the settings and config.
func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
Expand Down
4 changes: 2 additions & 2 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

if err = cfg.Validate(col.set.Factories); err != nil {
if err = cfg.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (col *Collector) DryRun(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

return cfg.Validate(col.set.Factories)
return cfg.Validate()
}

// Run starts the collector according to the given configuration, and waits for it to complete.
Expand Down
111 changes: 1 addition & 110 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -42,7 +41,7 @@ type Config struct {
// This function performs basic validation of configuration. There may be more subtle
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
func (cfg *Config) Validate(factories Factories) error {
func (cfg *Config) Validate() error {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
Expand Down Expand Up @@ -109,10 +108,6 @@ func (cfg *Config) Validate(factories Factories) error {
}
}

// Keep track of whether connectors are used as receivers and exporters using map[connectorID][]pipelineID
connectorsAsExporters := make(map[component.ID][]component.ID, len(cfg.Connectors))
connectorsAsReceivers := make(map[component.ID][]component.ID, len(cfg.Connectors))

// Check that all pipelines reference only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// Validate pipeline receiver name references.
Expand All @@ -123,7 +118,6 @@ func (cfg *Config) Validate(factories Factories) error {
}

if _, ok := cfg.Connectors[ref]; ok {
connectorsAsReceivers[ref] = append(connectorsAsReceivers[ref], pipelineID)
continue
}
return fmt.Errorf("service::pipelines::%s: references receiver %q which is not configured", pipelineID, ref)
Expand All @@ -144,113 +138,10 @@ func (cfg *Config) Validate(factories Factories) error {
continue
}
if _, ok := cfg.Connectors[ref]; ok {
connectorsAsExporters[ref] = append(connectorsAsExporters[ref], pipelineID)
continue
}
return fmt.Errorf("service::pipelines::%s: references exporter %q which is not configured", pipelineID, ref)
}
}

// Validate that connectors are used as both receiver and exporter
for connID := range cfg.Connectors {
// TODO must use connector factories to assess validity
expPipelines, expOK := connectorsAsExporters[connID]
recPipelines, recOK := connectorsAsReceivers[connID]
if recOK && !expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID)
}
if !recOK && expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID)
}
factory := factories.Connectors[connID.Type()]
if err := validateConnectorUse(factory, connID, expPipelines, recPipelines); err != nil {
return err
}
}

return nil
}

// Given the set of pipelines in which a connector is used as an exporter and receiver,
// we need to validate that all pipelines are using the connector in a supported way.
//
// For example, consider the following config, in which both logs and metrics are forwarded:
//
// pipelines:
// logs/in:
// receivers: [otlp]
// exporters: [forward]
// logs/out:
// receivers: [forward]
// exporters: [otlp]
// metrics/in:
// receivers: [otlp]
// exporters: [forward]
// metrics/out:
// receivers: [forward]
// exporters: [otlp]
//
// When validating this configuration, we look at each use of the connector and confirm that there
// is a valid corresponding use in another pipeline:
// - As an exporter in logs/in: Valid, because it has a corresponding use as a receiver in logs/out
// - As a receiver in logs/out: Valid, because it has a corresponding use as an exporter in logs/in
// - As an exporter in metrics/in: Valid, because it has a corresponding use as a receiver in metrics/out
// - As a receiver in metrics/out: Valid, because it has a corresponding use as an exporter in metrics/in
// We conclude that it is used correctly, because no uses are unconnected via a supported combination.
//
// Now consider the following config, in which we validation should fail:
//
// pipelines:
// traces/in:
// receivers: [otlp]
// exporters: [forward]
// traces/out:
// receivers: [forward]
// exporters: [otlp]
// metrics/in:
// receivers: [otlp]
// exporters: [forward]
//
// When validating this configuration, we look at each use of the connector and find:
// - As an exporter in traces/in: Valid, because it has a corresponding use as a receiver in traces/out
// - As a receiver in traces/out: Valid, because it has a corresponding use as an exporter in traces/in
// - As an exporter in metrics/in: *Invalid*, because it has a corresponding use as a receiver in a metrics pipeline
// We conclude that it is used incorrectly, because at least one use is unconnected via a supported combination.
func validateConnectorUse(factory connector.Factory, connID component.ID, expPipelines, recPipelines []component.ID) error {
expTypes := make(map[component.DataType]bool)
for _, pipelineID := range expPipelines {
// The presence of each key indicates how the connector is used as an exporter.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as a receiver.
expTypes[pipelineID.Type()] = false
}
recTypes := make(map[component.DataType]bool)
for _, pipelineID := range recPipelines {
// The presence of each key indicates how the connector is used as a receiver.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as an exporter.
recTypes[pipelineID.Type()] = false
}

for expType := range expTypes {
for recType := range recTypes {
if factory.Stability(expType, recType) != component.StabilityLevelUndefined {
expTypes[expType] = true
recTypes[recType] = true
}
}
}

for expType, supportedUse := range expTypes {
if !supportedUse {
return fmt.Errorf("connectors::%s: is used as exporter in %q pipeline but is not used in supported receiver pipeline", connID, expType)
}
}
for recType, supportedUse := range recTypes {
if !supportedUse {
return fmt.Errorf("connectors::%s: is used as receiver in %q pipeline but is not used in supported exporter pipeline", connID, recType)
}
}

return nil
}
120 changes: 8 additions & 112 deletions otelcol/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -213,103 +212,6 @@ func TestConfigValidate(t *testing.T) {
},
expected: errors.New(`service::pipelines::traces: references exporter "nop/conn2" which is not configured`),
},
{
name: "missing-connector-as-receiver",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn"))
return cfg
},
expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as receiver`),
},
{
name: "missing-connector-as-exporter",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn"))
return cfg
},
expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as exporter`),
},
{
name: "orphaned-connector-use-as-exporter",
cfgFn: func() *Config {
cfg := generateConfigWithPipelines(pipelines.Config{
component.NewIDWithName("traces", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("traces", "out"): {
Receivers: []component.ID{component.NewIDWithName("nop", "conn")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewIDWithName("metrics", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
},
})
return cfg
},
expected: errors.New(`connectors::nop/conn: is used as exporter in "metrics" pipeline but is not used in supported receiver pipeline`),
},
{
name: "orphaned-connector-use-as-receiver",
cfgFn: func() *Config {
cfg := generateConfigWithPipelines(pipelines.Config{
component.NewIDWithName("metrics", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("metrics", "out"): {
Receivers: []component.ID{component.NewIDWithName("nop", "conn")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewIDWithName("traces", "out"): {
Receivers: []component.ID{component.NewIDWithName("nop", "conn")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
})
return cfg
},
expected: errors.New(`connectors::nop/conn: is used as receiver in "traces" pipeline but is not used in supported exporter pipeline`),
},
{
name: "connector-forward",
cfgFn: func() *Config {
cfg := generateConfigWithPipelines(pipelines.Config{
component.NewIDWithName("metrics", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("metrics", "out"): {
Receivers: []component.ID{component.NewIDWithName("nop", "conn")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
component.NewIDWithName("traces", "in"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewIDWithName("nop", "conn")},
},
component.NewIDWithName("traces", "out"): {
Receivers: []component.ID{component.NewIDWithName("nop", "conn")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
})
return cfg
},
expected: nil,
},
{
name: "invalid-service-config",
cfgFn: func() *Config {
Expand All @@ -324,24 +226,12 @@ func TestConfigValidate(t *testing.T) {
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
cfg := test.cfgFn()
factories, err := nopFactories()
require.NoError(t, err)
assert.Equal(t, test.expected, cfg.Validate(factories))
assert.Equal(t, test.expected, cfg.Validate())
})
}
}

func generateConfig() *Config {
return generateConfigWithPipelines(pipelines.Config{
component.NewID("traces"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
})
}

func generateConfigWithPipelines(pipes pipelines.Config) *Config {
return &Config{
Receivers: map[component.ID]component.Config{
component.NewID("nop"): &errConfig{},
Expand Down Expand Up @@ -376,7 +266,13 @@ func generateConfigWithPipelines(pipes pipelines.Config) *Config {
},
},
Extensions: []component.ID{component.NewID("nop")},
Pipelines: pipes,
Pipelines: pipelines.Config{
component.NewID("traces"): {
Receivers: []component.ID{component.NewID("nop")},
Processors: []component.ID{component.NewID("nop")},
Exporters: []component.ID{component.NewID("nop")},
},
},
},
}
}
Loading

0 comments on commit f735a61

Please sign in to comment.