Skip to content

Commit

Permalink
Fix connector validation based on usage in pipelines
Browse files Browse the repository at this point in the history
Validation of connectors was too aggressive in that it failed if a connector
was used in any combination of unsupported roles. Instead, it should pass
validation as long as each use of the connector has a supported corresponding
use.

For example, the forward connector may forward traces and metrics at the same
time. Previously, validation would fail because it detected that traces->metrics
and metrics->traces were possible connections. Now it will pass as long as there
is a supported connection type for each pipeline in which the connector is used.
  • Loading branch information
djaglowski committed Jun 29, 2023
1 parent ec033ca commit c130a0b
Show file tree
Hide file tree
Showing 10 changed files with 291 additions and 269 deletions.
18 changes: 18 additions & 0 deletions .chloggen/connector-validation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: connector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix connector validation

# One or more tracking issues or pull requests related to the change
issues: [7892]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Validation of connectors was too aggressive such that a connector that was used in any combination of unsupported roles would fail.
Instead, validation should pass as long as each use of the connector has a supported corresponding use.
43 changes: 43 additions & 0 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type Factory interface {
LogsToMetricsStability() component.StabilityLevel
LogsToLogsStability() component.StabilityLevel

Stability(exporterType, receiverType component.Type) component.StabilityLevel

unexportedFactoryFunc()
}

Expand Down Expand Up @@ -437,6 +439,39 @@ func (f factory) LogsToLogsStability() component.StabilityLevel {
return f.logsToLogsStabilityLevel
}

func (f factory) Stability(expType, recType component.Type) component.StabilityLevel {
switch expType {
case component.DataTypeTraces:
switch recType {
case component.DataTypeTraces:
return f.tracesToTracesStabilityLevel
case component.DataTypeMetrics:
return f.tracesToMetricsStabilityLevel
case component.DataTypeLogs:
return f.tracesToLogsStabilityLevel
}
case component.DataTypeMetrics:
switch recType {
case component.DataTypeTraces:
return f.metricsToTracesStabilityLevel
case component.DataTypeMetrics:
return f.metricsToMetricsStabilityLevel
case component.DataTypeLogs:
return f.metricsToLogsStabilityLevel
}
case component.DataTypeLogs:
switch recType {
case component.DataTypeTraces:
return f.logsToTracesStabilityLevel
case component.DataTypeMetrics:
return f.logsToMetricsStabilityLevel
case component.DataTypeLogs:
return f.logsToLogsStabilityLevel
}
}
return component.StabilityLevelUndefined
}

// NewFactory returns a Factory.
func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory {
f := &factory{
Expand Down Expand Up @@ -473,6 +508,14 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.
return &Builder{cfgs: cfgs, factories: factories}
}

func (b *Builder) SupportsConnection(connType, expType, recType component.Type) bool {
f, exists := b.factories[connType]
if !exists {
return false
}
return f.Stability(expType, recType) != component.StabilityLevelUndefined
}

// CreateTracesToTraces creates a Traces connector based on the settings and config.
func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) {
cfg, existsCfg := b.cfgs[set.ID]
Expand Down
43 changes: 24 additions & 19 deletions connector/connectortest/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,57 +26,62 @@ func NewNopCreateSettings() connector.CreateSettings {
type nopConfig struct{}

// NewNopFactory returns a connector.Factory that constructs nop processors.
func NewNopFactory() connector.Factory {
func NewNopFactory(opts ...connector.FactoryOption) connector.Factory {
if len(opts) == 0 {
opts = []connector.FactoryOption{
connector.WithTracesToTraces(CreateTracesToTracesConnector, component.StabilityLevelDevelopment),
connector.WithTracesToMetrics(CreateTracesToMetricsConnector, component.StabilityLevelDevelopment),
connector.WithTracesToLogs(CreateTracesToLogsConnector, component.StabilityLevelDevelopment),
connector.WithMetricsToTraces(CreateMetricsToTracesConnector, component.StabilityLevelDevelopment),
connector.WithMetricsToMetrics(CreateMetricsToMetricsConnector, component.StabilityLevelDevelopment),
connector.WithMetricsToLogs(CreateMetricsToLogsConnector, component.StabilityLevelDevelopment),
connector.WithLogsToTraces(CreateLogsToTracesConnector, component.StabilityLevelDevelopment),
connector.WithLogsToMetrics(CreateLogsToMetricsConnector, component.StabilityLevelDevelopment),
connector.WithLogsToLogs(CreateLogsToLogsConnector, component.StabilityLevelDevelopment),
}
}
return connector.NewFactory(
"nop",
func() component.Config {
return &nopConfig{}
},
connector.WithTracesToTraces(createTracesToTracesConnector, component.StabilityLevelDevelopment),
connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelDevelopment),
connector.WithTracesToLogs(createTracesToLogsConnector, component.StabilityLevelDevelopment),
connector.WithMetricsToTraces(createMetricsToTracesConnector, component.StabilityLevelDevelopment),
connector.WithMetricsToMetrics(createMetricsToMetricsConnector, component.StabilityLevelDevelopment),
connector.WithMetricsToLogs(createMetricsToLogsConnector, component.StabilityLevelDevelopment),
connector.WithLogsToTraces(createLogsToTracesConnector, component.StabilityLevelDevelopment),
connector.WithLogsToMetrics(createLogsToMetricsConnector, component.StabilityLevelDevelopment),
connector.WithLogsToLogs(createLogsToLogsConnector, component.StabilityLevelDevelopment),
opts...,
)
}

func createTracesToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) {
func CreateTracesToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createTracesToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) {
func CreateTracesToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createTracesToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) {
func CreateTracesToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createMetricsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) {
func CreateMetricsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createMetricsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) {
func CreateMetricsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createMetricsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) {
func CreateMetricsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createLogsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) {
func CreateLogsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createLogsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) {
func CreateLogsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

func createLogsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) {
func CreateLogsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) {
return &nopConnector{Consumer: consumertest.NewNop()}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions otelcol/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

if err = cfg.Validate(); err != nil {
if err = cfg.Validate(col.set.Factories); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (col *Collector) DryRun(ctx context.Context) error {
return fmt.Errorf("failed to get config: %w", err)
}

return cfg.Validate()
return cfg.Validate(col.set.Factories)
}

// Run starts the collector according to the given configuration, and waits for it to complete.
Expand Down
106 changes: 98 additions & 8 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/service"
)

Expand Down Expand Up @@ -41,7 +42,7 @@ type Config struct {
// This function performs basic validation of configuration. There may be more subtle
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
func (cfg *Config) Validate() error {
func (cfg *Config) Validate(factories Factories) error {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
Expand Down Expand Up @@ -108,9 +109,9 @@ func (cfg *Config) Validate() error {
}
}

// Keep track of whether connectors are used as receivers and exporters
connectorsAsReceivers := make(map[component.ID]struct{}, len(cfg.Connectors))
connectorsAsExporters := make(map[component.ID]struct{}, len(cfg.Connectors))
// Keep track of whether connectors are used as receivers and exporters using map[connectorID][]pipelineID
connectorsAsExporters := make(map[component.ID][]component.ID, len(cfg.Connectors))
connectorsAsReceivers := make(map[component.ID][]component.ID, len(cfg.Connectors))

// Check that all pipelines reference only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
Expand All @@ -122,7 +123,7 @@ func (cfg *Config) Validate() error {
}

if _, ok := cfg.Connectors[ref]; ok {
connectorsAsReceivers[ref] = struct{}{}
connectorsAsReceivers[ref] = append(connectorsAsReceivers[ref], pipelineID)
continue
}
return fmt.Errorf("service::pipelines::%s: references receiver %q which is not configured", pipelineID, ref)
Expand All @@ -143,7 +144,7 @@ func (cfg *Config) Validate() error {
continue
}
if _, ok := cfg.Connectors[ref]; ok {
connectorsAsExporters[ref] = struct{}{}
connectorsAsExporters[ref] = append(connectorsAsExporters[ref], pipelineID)
continue
}
return fmt.Errorf("service::pipelines::%s: references exporter %q which is not configured", pipelineID, ref)
Expand All @@ -152,14 +153,103 @@ func (cfg *Config) Validate() error {

// Validate that connectors are used as both receiver and exporter
for connID := range cfg.Connectors {
_, recOK := connectorsAsReceivers[connID]
_, expOK := connectorsAsExporters[connID]
// TODO must use connector factories to assess validity
expPipelines, expOK := connectorsAsExporters[connID]
recPipelines, recOK := connectorsAsReceivers[connID]
if recOK && !expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID)
}
if !recOK && expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID)
}
factory := factories.Connectors[connID.Type()]
if err := validateConnectorUse(factory, connID, expPipelines, recPipelines); err != nil {
return err
}
}

return nil
}

// Given the set of pipelines in which a connector is used as an exporter and receiver,
// we need to validate that all pipelines are using the connector in a supported way.
//
// For example, consider the following config, in which both logs and metrics are forwarded:
//
// pipelines:
// logs/in:
// receivers: [otlp]
// exporters: [forward]
// logs/out:
// receivers: [forward]
// exporters: [otlp]
// metrics/in:
// receivers: [otlp]
// exporters: [forward]
// metrics/out:
// receivers: [forward]
// exporters: [otlp]
//
// When validating this configuration, we look at each use of the connector and confirm that there
// is a valid corresponding use in another pipeline:
// - As an exporter in logs/in: Valid, because it has a corresponding use as a receiver in logs/out
// - As a receiver in logs/out: Valid, because it has a corresponding use as an exporter in logs/in
// - As an exporter in metrics/in: Valid, because it has a corresponding use as a receiver in metrics/out
// - As a receiver in metrics/out: Valid, because it has a corresponding use as an exporter in metrics/in
// We conclude that it is used correctly, because no uses are unconnected via a supported combination.
//
// Now consider the following config, in which we validation should fail:
//
// pipelines:
// traces/in:
// receivers: [otlp]
// exporters: [forward]
// traces/out:
// receivers: [forward]
// exporters: [otlp]
// metrics/in:
// receivers: [otlp]
// exporters: [forward]
//
// When validating this configuration, we look at each use of the connector and find:
// - As an exporter in traces/in: Valid, because it has a corresponding use as a receiver in traces/out
// - As a receiver in traces/out: Valid, because it has a corresponding use as an exporter in traces/in
// - As an exporter in metrics/in: *Invalid*, because it has a corresponding use as a receiver in a metrics pipeline
// We conclude that it is used incorrectly, because at least one use is unconnected via a supported combination.
func validateConnectorUse(factory connector.Factory, connID component.ID, expPipelines, recPipelines []component.ID) error {
expTypes := make(map[component.DataType]bool)
for _, pipelineID := range expPipelines {
// The presence of each key indicates how the connector is used as an exporter.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as a receiver.
expTypes[pipelineID.Type()] = false
}
recTypes := make(map[component.DataType]bool)
for _, pipelineID := range recPipelines {
// The presence of each key indicates how the connector is used as a receiver.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as an exporter.
recTypes[pipelineID.Type()] = false
}

for expType := range expTypes {
for recType := range recTypes {
if factory.Stability(expType, recType) != component.StabilityLevelUndefined {
expTypes[expType] = true
recTypes[recType] = true
}
}
}

for expType, supportedUse := range expTypes {
if !supportedUse {
return fmt.Errorf("connectors::%s: is used as exporter in %q pipeline but is not used in supported receiver pipeline", connID, expType)
}
}
for recType, supportedUse := range recTypes {
if !supportedUse {
return fmt.Errorf("connectors::%s: is used as receiver in %q pipeline but is not used in supported exporter pipeline", connID, recType)
}
}

return nil
Expand Down
Loading

0 comments on commit c130a0b

Please sign in to comment.