From c130a0b075023488b6838361f40ca5b68bfdc045 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 20 Jun 2023 16:55:00 -0400 Subject: [PATCH 1/4] Fix connector validation based on usage in pipelines Validation of connectors was too aggressive in that it failed if a connector was used in any combination of unsupported roles. Instead, it should pass validation as long as each use of the connector has a supported corresponding use. For example, the forward connector may forward traces and metrics at the same time. Previously, validation would fail because it detected that traces->metrics and metrics->traces were possible connections. Now it will pass as long as there is a supported connection type for each pipeline in which the connector is used. --- .chloggen/connector-validation.yaml | 18 +++ connector/connector.go | 43 +++++ connector/connectortest/connector.go | 43 ++--- otelcol/collector.go | 4 +- otelcol/config.go | 106 +++++++++++- otelcol/config_test.go | 100 +++++++++++- otelcol/factories_test.go | 11 +- otelcol/otelcoltest/config.go | 2 +- service/internal/graph/graph.go | 3 + service/internal/graph/graph_test.go | 230 --------------------------- 10 files changed, 291 insertions(+), 269 deletions(-) create mode 100755 .chloggen/connector-validation.yaml diff --git a/.chloggen/connector-validation.yaml b/.chloggen/connector-validation.yaml new file mode 100755 index 00000000000..c270893e511 --- /dev/null +++ b/.chloggen/connector-validation.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: connector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix connector validation + +# One or more tracking issues or pull requests related to the change +issues: [7892] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Validation of connectors was too aggressive such that a connector that was used in any combination of unsupported roles would fail. + Instead, validation should pass as long as each use of the connector has a supported corresponding use. diff --git a/connector/connector.go b/connector/connector.go index 3a85add44a6..4d3eab5f56d 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -131,6 +131,8 @@ type Factory interface { LogsToMetricsStability() component.StabilityLevel LogsToLogsStability() component.StabilityLevel + Stability(exporterType, receiverType component.Type) component.StabilityLevel + unexportedFactoryFunc() } @@ -437,6 +439,39 @@ func (f factory) LogsToLogsStability() component.StabilityLevel { return f.logsToLogsStabilityLevel } +func (f factory) Stability(expType, recType component.Type) component.StabilityLevel { + switch expType { + case component.DataTypeTraces: + switch recType { + case component.DataTypeTraces: + return f.tracesToTracesStabilityLevel + case component.DataTypeMetrics: + return f.tracesToMetricsStabilityLevel + case component.DataTypeLogs: + return f.tracesToLogsStabilityLevel + } + case component.DataTypeMetrics: + switch recType { + case component.DataTypeTraces: + return f.metricsToTracesStabilityLevel + case component.DataTypeMetrics: + return f.metricsToMetricsStabilityLevel + case component.DataTypeLogs: + return f.metricsToLogsStabilityLevel + } + case component.DataTypeLogs: + switch recType { + case component.DataTypeTraces: + return f.logsToTracesStabilityLevel + case component.DataTypeMetrics: + return f.logsToMetricsStabilityLevel + case component.DataTypeLogs: + return f.logsToLogsStabilityLevel + } + } + return component.StabilityLevelUndefined +} + // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ @@ -473,6 +508,14 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component. return &Builder{cfgs: cfgs, factories: factories} } +func (b *Builder) SupportsConnection(connType, expType, recType component.Type) bool { + f, exists := b.factories[connType] + if !exists { + return false + } + return f.Stability(expType, recType) != component.StabilityLevelUndefined +} + // CreateTracesToTraces creates a Traces connector based on the settings and config. func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { cfg, existsCfg := b.cfgs[set.ID] diff --git a/connector/connectortest/connector.go b/connector/connectortest/connector.go index 137a353423c..0b85178c7bc 100644 --- a/connector/connectortest/connector.go +++ b/connector/connectortest/connector.go @@ -26,57 +26,62 @@ func NewNopCreateSettings() connector.CreateSettings { type nopConfig struct{} // NewNopFactory returns a connector.Factory that constructs nop processors. -func NewNopFactory() connector.Factory { +func NewNopFactory(opts ...connector.FactoryOption) connector.Factory { + if len(opts) == 0 { + opts = []connector.FactoryOption{ + connector.WithTracesToTraces(CreateTracesToTracesConnector, component.StabilityLevelDevelopment), + connector.WithTracesToMetrics(CreateTracesToMetricsConnector, component.StabilityLevelDevelopment), + connector.WithTracesToLogs(CreateTracesToLogsConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToTraces(CreateMetricsToTracesConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToMetrics(CreateMetricsToMetricsConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToLogs(CreateMetricsToLogsConnector, component.StabilityLevelDevelopment), + connector.WithLogsToTraces(CreateLogsToTracesConnector, component.StabilityLevelDevelopment), + connector.WithLogsToMetrics(CreateLogsToMetricsConnector, component.StabilityLevelDevelopment), + connector.WithLogsToLogs(CreateLogsToLogsConnector, component.StabilityLevelDevelopment), + } + } return connector.NewFactory( "nop", func() component.Config { return &nopConfig{} }, - connector.WithTracesToTraces(createTracesToTracesConnector, component.StabilityLevelDevelopment), - connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelDevelopment), - connector.WithTracesToLogs(createTracesToLogsConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToTraces(createMetricsToTracesConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToMetrics(createMetricsToMetricsConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToLogs(createMetricsToLogsConnector, component.StabilityLevelDevelopment), - connector.WithLogsToTraces(createLogsToTracesConnector, component.StabilityLevelDevelopment), - connector.WithLogsToMetrics(createLogsToMetricsConnector, component.StabilityLevelDevelopment), - connector.WithLogsToLogs(createLogsToLogsConnector, component.StabilityLevelDevelopment), + opts..., ) } -func createTracesToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) { +func CreateTracesToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createTracesToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) { +func CreateTracesToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createTracesToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) { +func CreateTracesToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createMetricsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) { +func CreateMetricsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createMetricsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) { +func CreateMetricsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createMetricsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) { +func CreateMetricsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createLogsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) { +func CreateLogsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createLogsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) { +func CreateLogsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func createLogsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) { +func CreateLogsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } diff --git a/otelcol/collector.go b/otelcol/collector.go index 788c022e9f2..dcde4c53ad0 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -148,7 +148,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("failed to get config: %w", err) } - if err = cfg.Validate(); err != nil { + if err = cfg.Validate(col.set.Factories); err != nil { return fmt.Errorf("invalid configuration: %w", err) } @@ -198,7 +198,7 @@ func (col *Collector) DryRun(ctx context.Context) error { return fmt.Errorf("failed to get config: %w", err) } - return cfg.Validate() + return cfg.Validate(col.set.Factories) } // Run starts the collector according to the given configuration, and waits for it to complete. diff --git a/otelcol/config.go b/otelcol/config.go index 31e51a1dafe..32eb9ba666f 100644 --- a/otelcol/config.go +++ b/otelcol/config.go @@ -8,6 +8,7 @@ import ( "fmt" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/service" ) @@ -41,7 +42,7 @@ type Config struct { // This function performs basic validation of configuration. There may be more subtle // invalid cases that we currently don't check for but which we may want to add in // the future (e.g. disallowing receiving and exporting on the same endpoint). -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(factories Factories) error { // Currently, there is no default receiver enabled. // The configuration must specify at least one receiver to be valid. if len(cfg.Receivers) == 0 { @@ -108,9 +109,9 @@ func (cfg *Config) Validate() error { } } - // Keep track of whether connectors are used as receivers and exporters - connectorsAsReceivers := make(map[component.ID]struct{}, len(cfg.Connectors)) - connectorsAsExporters := make(map[component.ID]struct{}, len(cfg.Connectors)) + // Keep track of whether connectors are used as receivers and exporters using map[connectorID][]pipelineID + connectorsAsExporters := make(map[component.ID][]component.ID, len(cfg.Connectors)) + connectorsAsReceivers := make(map[component.ID][]component.ID, len(cfg.Connectors)) // Check that all pipelines reference only configured components. for pipelineID, pipeline := range cfg.Service.Pipelines { @@ -122,7 +123,7 @@ func (cfg *Config) Validate() error { } if _, ok := cfg.Connectors[ref]; ok { - connectorsAsReceivers[ref] = struct{}{} + connectorsAsReceivers[ref] = append(connectorsAsReceivers[ref], pipelineID) continue } return fmt.Errorf("service::pipelines::%s: references receiver %q which is not configured", pipelineID, ref) @@ -143,7 +144,7 @@ func (cfg *Config) Validate() error { continue } if _, ok := cfg.Connectors[ref]; ok { - connectorsAsExporters[ref] = struct{}{} + connectorsAsExporters[ref] = append(connectorsAsExporters[ref], pipelineID) continue } return fmt.Errorf("service::pipelines::%s: references exporter %q which is not configured", pipelineID, ref) @@ -152,14 +153,103 @@ func (cfg *Config) Validate() error { // Validate that connectors are used as both receiver and exporter for connID := range cfg.Connectors { - _, recOK := connectorsAsReceivers[connID] - _, expOK := connectorsAsExporters[connID] + // TODO must use connector factories to assess validity + expPipelines, expOK := connectorsAsExporters[connID] + recPipelines, recOK := connectorsAsReceivers[connID] if recOK && !expOK { return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID) } if !recOK && expOK { return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID) } + factory := factories.Connectors[connID.Type()] + if err := validateConnectorUse(factory, connID, expPipelines, recPipelines); err != nil { + return err + } + } + + return nil +} + +// Given the set of pipelines in which a connector is used as an exporter and receiver, +// we need to validate that all pipelines are using the connector in a supported way. +// +// For example, consider the following config, in which both logs and metrics are forwarded: +// +// pipelines: +// logs/in: +// receivers: [otlp] +// exporters: [forward] +// logs/out: +// receivers: [forward] +// exporters: [otlp] +// metrics/in: +// receivers: [otlp] +// exporters: [forward] +// metrics/out: +// receivers: [forward] +// exporters: [otlp] +// +// When validating this configuration, we look at each use of the connector and confirm that there +// is a valid corresponding use in another pipeline: +// - As an exporter in logs/in: Valid, because it has a corresponding use as a receiver in logs/out +// - As a receiver in logs/out: Valid, because it has a corresponding use as an exporter in logs/in +// - As an exporter in metrics/in: Valid, because it has a corresponding use as a receiver in metrics/out +// - As a receiver in metrics/out: Valid, because it has a corresponding use as an exporter in metrics/in +// We conclude that it is used correctly, because no uses are unconnected via a supported combination. +// +// Now consider the following config, in which we validation should fail: +// +// pipelines: +// traces/in: +// receivers: [otlp] +// exporters: [forward] +// traces/out: +// receivers: [forward] +// exporters: [otlp] +// metrics/in: +// receivers: [otlp] +// exporters: [forward] +// +// When validating this configuration, we look at each use of the connector and find: +// - As an exporter in traces/in: Valid, because it has a corresponding use as a receiver in traces/out +// - As a receiver in traces/out: Valid, because it has a corresponding use as an exporter in traces/in +// - As an exporter in metrics/in: *Invalid*, because it has a corresponding use as a receiver in a metrics pipeline +// We conclude that it is used incorrectly, because at least one use is unconnected via a supported combination. +func validateConnectorUse(factory connector.Factory, connID component.ID, expPipelines, recPipelines []component.ID) error { + expTypes := make(map[component.DataType]bool) + for _, pipelineID := range expPipelines { + // The presence of each key indicates how the connector is used as an exporter. + // The value is initially set to false. Later we will set the value to true *if* we + // confirm that there is a supported corresponding use as a receiver. + expTypes[pipelineID.Type()] = false + } + recTypes := make(map[component.DataType]bool) + for _, pipelineID := range recPipelines { + // The presence of each key indicates how the connector is used as a receiver. + // The value is initially set to false. Later we will set the value to true *if* we + // confirm that there is a supported corresponding use as an exporter. + recTypes[pipelineID.Type()] = false + } + + for expType := range expTypes { + for recType := range recTypes { + if factory.Stability(expType, recType) != component.StabilityLevelUndefined { + expTypes[expType] = true + recTypes[recType] = true + } + } + } + + for expType, supportedUse := range expTypes { + if !supportedUse { + return fmt.Errorf("connectors::%s: is used as exporter in %q pipeline but is not used in supported receiver pipeline", connID, expType) + } + } + for recType, supportedUse := range recTypes { + if !supportedUse { + return fmt.Errorf("connectors::%s: is used as receiver in %q pipeline but is not used in supported exporter pipeline", connID, recType) + } } return nil diff --git a/otelcol/config_test.go b/otelcol/config_test.go index 3674d0f70b5..0e1277a55e9 100644 --- a/otelcol/config_test.go +++ b/otelcol/config_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" @@ -232,6 +233,83 @@ func TestConfigValidate(t *testing.T) { }, expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as exporter`), }, + { + name: "orphaned-connector-use-as-exporter", + cfgFn: func() *Config { + cfg := generateConfigWithPipelines(pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, + }, + }) + return cfg + }, + expected: errors.New(`connectors::nop/conn: is used as exporter in "metrics" pipeline but is not used in supported receiver pipeline`), + }, + { + name: "orphaned-connector-use-as-receiver", + cfgFn: func() *Config { + cfg := generateConfigWithPipelines(pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }) + return cfg + }, + expected: errors.New(`connectors::nop/conn: is used as receiver in "traces" pipeline but is not used in supported exporter pipeline`), + }, + { + name: "connector-forward", + cfgFn: func() *Config { + cfg := generateConfigWithPipelines(pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }) + return cfg + }, + expected: nil, + }, { name: "invalid-service-config", cfgFn: func() *Config { @@ -246,12 +324,24 @@ func TestConfigValidate(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { cfg := test.cfgFn() - assert.Equal(t, test.expected, cfg.Validate()) + factories, err := nopFactories() + require.NoError(t, err) + assert.Equal(t, test.expected, cfg.Validate(factories)) }) } } func generateConfig() *Config { + return generateConfigWithPipelines(pipelines.Config{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }) +} + +func generateConfigWithPipelines(pipes pipelines.Config) *Config { return &Config{ Receivers: map[component.ID]component.Config{ component.NewID("nop"): &errConfig{}, @@ -286,13 +376,7 @@ func generateConfig() *Config { }, }, Extensions: []component.ID{component.NewID("nop")}, - Pipelines: pipelines.Config{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, + Pipelines: pipes, }, } } diff --git a/otelcol/factories_test.go b/otelcol/factories_test.go index 645d59c7db2..db992cfbaa9 100644 --- a/otelcol/factories_test.go +++ b/otelcol/factories_test.go @@ -4,6 +4,7 @@ package otelcol import ( + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/exporter" @@ -20,7 +21,15 @@ func nopFactories() (Factories, error) { var factories Factories var err error - if factories.Connectors, err = connector.MakeFactoryMap(connectortest.NewNopFactory()); err != nil { + if factories.Connectors, err = connector.MakeFactoryMap(connectortest.NewNopFactory( + // Used in TestValidateConfig + connector.WithTracesToTraces(connectortest.CreateTracesToTracesConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToMetrics(connectortest.CreateMetricsToMetricsConnector, component.StabilityLevelDevelopment), + // connector.WithLogsToLogs(connectortest.CreateLogsToLogsConnector, component.StabilityLevelDevelopment), + + // Used in testdata/otelcol-nop.yaml + connector.WithTracesToLogs(connectortest.CreateTracesToLogsConnector, component.StabilityLevelDevelopment), + )); err != nil { return Factories{}, err } diff --git a/otelcol/otelcoltest/config.go b/otelcol/otelcoltest/config.go index 74cdb18f260..ffc3f6a8944 100644 --- a/otelcol/otelcoltest/config.go +++ b/otelcol/otelcoltest/config.go @@ -37,7 +37,7 @@ func LoadConfigAndValidate(fileName string, factories otelcol.Factories) (*otelc if err != nil { return nil, err } - return cfg, cfg.Validate() + return cfg, cfg.Validate(factories) } func makeMapProvidersMap(providers ...confmap.Provider) map[string]confmap.Provider { diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 421aa598611..452c8f8f055 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -101,6 +101,9 @@ func (g *Graph) createNodes(set Settings) { for connID, exprPipelineIDs := range connectorsAsExporter { for _, eID := range exprPipelineIDs { for _, rID := range connectorsAsReceiver[connID] { + if !set.ConnectorBuilder.SupportsConnection(connID.Type(), eID.Type(), rID.Type()) { + continue + } connNode := g.createConnector(eID, rID, connID) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index b4dcf4f90e6..788152d4564 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -1187,213 +1187,6 @@ func TestGraphBuildErrors(t *testing.T) { }, expected: "failed to create \"bf\" receiver for data type \"traces\": telemetry type is not supported", }, - { - name: "not_supported_connector_traces_traces.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("traces", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from traces to traces: telemetry type is not supported", - }, - { - name: "not_supported_connector_traces_metrics.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("traces", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from traces to metrics: telemetry type is not supported", - }, - { - name: "not_supported_connector_traces_logs.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("traces", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("logs", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from traces to logs: telemetry type is not supported", - }, - { - name: "not_supported_connector_metrics_traces.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("metrics", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from metrics to traces: telemetry type is not supported", - }, - { - name: "not_supported_connector_metrics_metrics.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("metrics", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from metrics to metrics: telemetry type is not supported", - }, - { - name: "not_supported_connector_metrics_logs.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("metrics", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("logs", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from metrics to logs: telemetry type is not supported", - }, - { - name: "not_supported_connector_logs_traces.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("logs", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from logs to traces: telemetry type is not supported", - }, - { - name: "not_supported_connector_logs_metrics.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("logs", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from logs to metrics: telemetry type is not supported", - }, - { - name: "not_supported_connector_logs_logs.yaml", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("logs", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("bf")}, - }, - component.NewIDWithName("logs", "out"): { - Receivers: []component.ID{component.NewID("bf")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector \"bf\" cannot connect from logs to logs: telemetry type is not supported", - }, { name: "not_allowed_simple_cycle_traces.yaml", receiverCfgs: map[component.ID]component.Config{ @@ -1774,29 +1567,6 @@ func TestGraphBuildErrors(t *testing.T) { }, expected: "failed to create \"unknown\" receiver for data type \"logs\": receiver factory not available for: \"unknown\"", }, - { - name: "unknown_connector_factory", - receiverCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - exporterCfgs: map[component.ID]component.Config{ - component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), - }, - connectorCfgs: map[component.ID]component.Config{ - component.NewID("unknown"): nopConnectorFactory.CreateDefaultConfig(), - }, - pipelineCfgs: pipelines.Config{ - component.NewIDWithName("traces", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("unknown")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewID("unknown")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }, - expected: "connector factory not available for: \"unknown\"", - }, } for _, test := range tests { From f735a61f266d76e4d262af0b00f10a91e8deae82 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 29 Jun 2023 15:53:43 -0400 Subject: [PATCH 2/4] Fix connector validation based on usage in pipelines --- connector/connector.go | 43 --- otelcol/collector.go | 4 +- otelcol/config.go | 111 +----- otelcol/config_test.go | 120 +----- otelcol/factories_test.go | 11 +- otelcol/otelcoltest/config.go | 2 +- service/internal/graph/graph.go | 102 ++++- service/internal/graph/graph_test.go | 358 +++++++++++++++++- .../testcomponents/example_connector.go | 8 + 9 files changed, 475 insertions(+), 284 deletions(-) diff --git a/connector/connector.go b/connector/connector.go index 4d3eab5f56d..3a85add44a6 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -131,8 +131,6 @@ type Factory interface { LogsToMetricsStability() component.StabilityLevel LogsToLogsStability() component.StabilityLevel - Stability(exporterType, receiverType component.Type) component.StabilityLevel - unexportedFactoryFunc() } @@ -439,39 +437,6 @@ func (f factory) LogsToLogsStability() component.StabilityLevel { return f.logsToLogsStabilityLevel } -func (f factory) Stability(expType, recType component.Type) component.StabilityLevel { - switch expType { - case component.DataTypeTraces: - switch recType { - case component.DataTypeTraces: - return f.tracesToTracesStabilityLevel - case component.DataTypeMetrics: - return f.tracesToMetricsStabilityLevel - case component.DataTypeLogs: - return f.tracesToLogsStabilityLevel - } - case component.DataTypeMetrics: - switch recType { - case component.DataTypeTraces: - return f.metricsToTracesStabilityLevel - case component.DataTypeMetrics: - return f.metricsToMetricsStabilityLevel - case component.DataTypeLogs: - return f.metricsToLogsStabilityLevel - } - case component.DataTypeLogs: - switch recType { - case component.DataTypeTraces: - return f.logsToTracesStabilityLevel - case component.DataTypeMetrics: - return f.logsToMetricsStabilityLevel - case component.DataTypeLogs: - return f.logsToLogsStabilityLevel - } - } - return component.StabilityLevelUndefined -} - // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { f := &factory{ @@ -508,14 +473,6 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component. return &Builder{cfgs: cfgs, factories: factories} } -func (b *Builder) SupportsConnection(connType, expType, recType component.Type) bool { - f, exists := b.factories[connType] - if !exists { - return false - } - return f.Stability(expType, recType) != component.StabilityLevelUndefined -} - // CreateTracesToTraces creates a Traces connector based on the settings and config. func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { cfg, existsCfg := b.cfgs[set.ID] diff --git a/otelcol/collector.go b/otelcol/collector.go index dcde4c53ad0..788c022e9f2 100644 --- a/otelcol/collector.go +++ b/otelcol/collector.go @@ -148,7 +148,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { return fmt.Errorf("failed to get config: %w", err) } - if err = cfg.Validate(col.set.Factories); err != nil { + if err = cfg.Validate(); err != nil { return fmt.Errorf("invalid configuration: %w", err) } @@ -198,7 +198,7 @@ func (col *Collector) DryRun(ctx context.Context) error { return fmt.Errorf("failed to get config: %w", err) } - return cfg.Validate(col.set.Factories) + return cfg.Validate() } // Run starts the collector according to the given configuration, and waits for it to complete. diff --git a/otelcol/config.go b/otelcol/config.go index 32eb9ba666f..4f5a7680aed 100644 --- a/otelcol/config.go +++ b/otelcol/config.go @@ -8,7 +8,6 @@ import ( "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/service" ) @@ -42,7 +41,7 @@ type Config struct { // This function performs basic validation of configuration. There may be more subtle // invalid cases that we currently don't check for but which we may want to add in // the future (e.g. disallowing receiving and exporting on the same endpoint). -func (cfg *Config) Validate(factories Factories) error { +func (cfg *Config) Validate() error { // Currently, there is no default receiver enabled. // The configuration must specify at least one receiver to be valid. if len(cfg.Receivers) == 0 { @@ -109,10 +108,6 @@ func (cfg *Config) Validate(factories Factories) error { } } - // Keep track of whether connectors are used as receivers and exporters using map[connectorID][]pipelineID - connectorsAsExporters := make(map[component.ID][]component.ID, len(cfg.Connectors)) - connectorsAsReceivers := make(map[component.ID][]component.ID, len(cfg.Connectors)) - // Check that all pipelines reference only configured components. for pipelineID, pipeline := range cfg.Service.Pipelines { // Validate pipeline receiver name references. @@ -123,7 +118,6 @@ func (cfg *Config) Validate(factories Factories) error { } if _, ok := cfg.Connectors[ref]; ok { - connectorsAsReceivers[ref] = append(connectorsAsReceivers[ref], pipelineID) continue } return fmt.Errorf("service::pipelines::%s: references receiver %q which is not configured", pipelineID, ref) @@ -144,113 +138,10 @@ func (cfg *Config) Validate(factories Factories) error { continue } if _, ok := cfg.Connectors[ref]; ok { - connectorsAsExporters[ref] = append(connectorsAsExporters[ref], pipelineID) continue } return fmt.Errorf("service::pipelines::%s: references exporter %q which is not configured", pipelineID, ref) } } - - // Validate that connectors are used as both receiver and exporter - for connID := range cfg.Connectors { - // TODO must use connector factories to assess validity - expPipelines, expOK := connectorsAsExporters[connID] - recPipelines, recOK := connectorsAsReceivers[connID] - if recOK && !expOK { - return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID) - } - if !recOK && expOK { - return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID) - } - factory := factories.Connectors[connID.Type()] - if err := validateConnectorUse(factory, connID, expPipelines, recPipelines); err != nil { - return err - } - } - - return nil -} - -// Given the set of pipelines in which a connector is used as an exporter and receiver, -// we need to validate that all pipelines are using the connector in a supported way. -// -// For example, consider the following config, in which both logs and metrics are forwarded: -// -// pipelines: -// logs/in: -// receivers: [otlp] -// exporters: [forward] -// logs/out: -// receivers: [forward] -// exporters: [otlp] -// metrics/in: -// receivers: [otlp] -// exporters: [forward] -// metrics/out: -// receivers: [forward] -// exporters: [otlp] -// -// When validating this configuration, we look at each use of the connector and confirm that there -// is a valid corresponding use in another pipeline: -// - As an exporter in logs/in: Valid, because it has a corresponding use as a receiver in logs/out -// - As a receiver in logs/out: Valid, because it has a corresponding use as an exporter in logs/in -// - As an exporter in metrics/in: Valid, because it has a corresponding use as a receiver in metrics/out -// - As a receiver in metrics/out: Valid, because it has a corresponding use as an exporter in metrics/in -// We conclude that it is used correctly, because no uses are unconnected via a supported combination. -// -// Now consider the following config, in which we validation should fail: -// -// pipelines: -// traces/in: -// receivers: [otlp] -// exporters: [forward] -// traces/out: -// receivers: [forward] -// exporters: [otlp] -// metrics/in: -// receivers: [otlp] -// exporters: [forward] -// -// When validating this configuration, we look at each use of the connector and find: -// - As an exporter in traces/in: Valid, because it has a corresponding use as a receiver in traces/out -// - As a receiver in traces/out: Valid, because it has a corresponding use as an exporter in traces/in -// - As an exporter in metrics/in: *Invalid*, because it has a corresponding use as a receiver in a metrics pipeline -// We conclude that it is used incorrectly, because at least one use is unconnected via a supported combination. -func validateConnectorUse(factory connector.Factory, connID component.ID, expPipelines, recPipelines []component.ID) error { - expTypes := make(map[component.DataType]bool) - for _, pipelineID := range expPipelines { - // The presence of each key indicates how the connector is used as an exporter. - // The value is initially set to false. Later we will set the value to true *if* we - // confirm that there is a supported corresponding use as a receiver. - expTypes[pipelineID.Type()] = false - } - recTypes := make(map[component.DataType]bool) - for _, pipelineID := range recPipelines { - // The presence of each key indicates how the connector is used as a receiver. - // The value is initially set to false. Later we will set the value to true *if* we - // confirm that there is a supported corresponding use as an exporter. - recTypes[pipelineID.Type()] = false - } - - for expType := range expTypes { - for recType := range recTypes { - if factory.Stability(expType, recType) != component.StabilityLevelUndefined { - expTypes[expType] = true - recTypes[recType] = true - } - } - } - - for expType, supportedUse := range expTypes { - if !supportedUse { - return fmt.Errorf("connectors::%s: is used as exporter in %q pipeline but is not used in supported receiver pipeline", connID, expType) - } - } - for recType, supportedUse := range recTypes { - if !supportedUse { - return fmt.Errorf("connectors::%s: is used as receiver in %q pipeline but is not used in supported exporter pipeline", connID, recType) - } - } - return nil } diff --git a/otelcol/config_test.go b/otelcol/config_test.go index 0e1277a55e9..924a7f24a50 100644 --- a/otelcol/config_test.go +++ b/otelcol/config_test.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" @@ -213,103 +212,6 @@ func TestConfigValidate(t *testing.T) { }, expected: errors.New(`service::pipelines::traces: references exporter "nop/conn2" which is not configured`), }, - { - name: "missing-connector-as-receiver", - cfgFn: func() *Config { - cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] - pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn")) - return cfg - }, - expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as receiver`), - }, - { - name: "missing-connector-as-exporter", - cfgFn: func() *Config { - cfg := generateConfig() - pipe := cfg.Service.Pipelines[component.NewID("traces")] - pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn")) - return cfg - }, - expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as exporter`), - }, - { - name: "orphaned-connector-use-as-exporter", - cfgFn: func() *Config { - cfg := generateConfigWithPipelines(pipelines.Config{ - component.NewIDWithName("traces", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - component.NewIDWithName("metrics", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, - }, - }) - return cfg - }, - expected: errors.New(`connectors::nop/conn: is used as exporter in "metrics" pipeline but is not used in supported receiver pipeline`), - }, - { - name: "orphaned-connector-use-as-receiver", - cfgFn: func() *Config { - cfg := generateConfigWithPipelines(pipelines.Config{ - component.NewIDWithName("metrics", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, - }, - component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }) - return cfg - }, - expected: errors.New(`connectors::nop/conn: is used as receiver in "traces" pipeline but is not used in supported exporter pipeline`), - }, - { - name: "connector-forward", - cfgFn: func() *Config { - cfg := generateConfigWithPipelines(pipelines.Config{ - component.NewIDWithName("metrics", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, - }, - component.NewIDWithName("metrics", "out"): { - Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - component.NewIDWithName("traces", "in"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, - }, - component.NewIDWithName("traces", "out"): { - Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }) - return cfg - }, - expected: nil, - }, { name: "invalid-service-config", cfgFn: func() *Config { @@ -324,24 +226,12 @@ func TestConfigValidate(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { cfg := test.cfgFn() - factories, err := nopFactories() - require.NoError(t, err) - assert.Equal(t, test.expected, cfg.Validate(factories)) + assert.Equal(t, test.expected, cfg.Validate()) }) } } func generateConfig() *Config { - return generateConfigWithPipelines(pipelines.Config{ - component.NewID("traces"): { - Receivers: []component.ID{component.NewID("nop")}, - Processors: []component.ID{component.NewID("nop")}, - Exporters: []component.ID{component.NewID("nop")}, - }, - }) -} - -func generateConfigWithPipelines(pipes pipelines.Config) *Config { return &Config{ Receivers: map[component.ID]component.Config{ component.NewID("nop"): &errConfig{}, @@ -376,7 +266,13 @@ func generateConfigWithPipelines(pipes pipelines.Config) *Config { }, }, Extensions: []component.ID{component.NewID("nop")}, - Pipelines: pipes, + Pipelines: pipelines.Config{ + component.NewID("traces"): { + Receivers: []component.ID{component.NewID("nop")}, + Processors: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, }, } } diff --git a/otelcol/factories_test.go b/otelcol/factories_test.go index db992cfbaa9..645d59c7db2 100644 --- a/otelcol/factories_test.go +++ b/otelcol/factories_test.go @@ -4,7 +4,6 @@ package otelcol import ( - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/connector/connectortest" "go.opentelemetry.io/collector/exporter" @@ -21,15 +20,7 @@ func nopFactories() (Factories, error) { var factories Factories var err error - if factories.Connectors, err = connector.MakeFactoryMap(connectortest.NewNopFactory( - // Used in TestValidateConfig - connector.WithTracesToTraces(connectortest.CreateTracesToTracesConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToMetrics(connectortest.CreateMetricsToMetricsConnector, component.StabilityLevelDevelopment), - // connector.WithLogsToLogs(connectortest.CreateLogsToLogsConnector, component.StabilityLevelDevelopment), - - // Used in testdata/otelcol-nop.yaml - connector.WithTracesToLogs(connectortest.CreateTracesToLogsConnector, component.StabilityLevelDevelopment), - )); err != nil { + if factories.Connectors, err = connector.MakeFactoryMap(connectortest.NewNopFactory()); err != nil { return Factories{}, err } diff --git a/otelcol/otelcoltest/config.go b/otelcol/otelcoltest/config.go index ffc3f6a8944..74cdb18f260 100644 --- a/otelcol/otelcoltest/config.go +++ b/otelcol/otelcoltest/config.go @@ -37,7 +37,7 @@ func LoadConfigAndValidate(fileName string, factories otelcol.Factories) (*otelc if err != nil { return nil, err } - return cfg, cfg.Validate(factories) + return cfg, cfg.Validate() } func makeMapProvidersMap(providers ...confmap.Provider) map[string]confmap.Provider { diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 452c8f8f055..d57a0b911d7 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -58,13 +58,18 @@ func Build(ctx context.Context, set Settings) (*Graph, error) { exporters: make(map[int64]graph.Node), } } - pipelines.createNodes(set) + if err := pipelines.createNodes(set); err != nil { + return nil, err + } pipelines.createEdges() return pipelines, pipelines.buildComponents(ctx, set) } // Creates a node for each instance of a component and adds it to the graph -func (g *Graph) createNodes(set Settings) { +func (g *Graph) createNodes(set Settings) error { + // Build a list of all connectors for easy reference + connectors := make(map[component.ID]struct{}) + // Keep track of connectors and where they are used. (map[connectorID][]pipelineID) connectorsAsExporter := make(map[component.ID][]component.ID) connectorsAsReceiver := make(map[component.ID][]component.ID) @@ -73,6 +78,7 @@ func (g *Graph) createNodes(set Settings) { pipe := g.pipelines[pipelineID] for _, recvID := range pipelineCfg.Receivers { if set.ConnectorBuilder.IsConfigured(recvID) { + connectors[recvID] = struct{}{} connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } @@ -90,6 +96,7 @@ func (g *Graph) createNodes(set Settings) { for _, exprID := range pipelineCfg.Exporters { if set.ConnectorBuilder.IsConfigured(exprID) { + connectors[exprID] = struct{}{} connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } @@ -98,10 +105,61 @@ func (g *Graph) createNodes(set Settings) { } } - for connID, exprPipelineIDs := range connectorsAsExporter { - for _, eID := range exprPipelineIDs { + for connID := range connectors { + factory := set.ConnectorBuilder.Factory(connID.Type()) + if factory == nil { + return fmt.Errorf("connector factory not available for: %q", connID.Type()) + } + connFactory := factory.(connector.Factory) + + expTypes := make(map[component.DataType]bool) + for _, pipelineID := range connectorsAsExporter[connID] { + // The presence of each key indicates how the connector is used as an exporter. + // The value is initially set to false. Later we will set the value to true *if* we + // confirm that there is a supported corresponding use as a receiver. + expTypes[pipelineID.Type()] = false + } + recTypes := make(map[component.DataType]bool) + for _, pipelineID := range connectorsAsReceiver[connID] { + // The presence of each key indicates how the connector is used as a receiver. + // The value is initially set to false. Later we will set the value to true *if* we + // confirm that there is a supported corresponding use as an exporter. + recTypes[pipelineID.Type()] = false + } + + for expType := range expTypes { + for recType := range recTypes { + if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined { + expTypes[expType] = true + recTypes[recType] = true + } + } + } + + for expType, supportedUse := range expTypes { + if supportedUse { + continue + } + // Retain previous error message for special case where connector is used once, but incorrectly. + // TODO remove this case if we agree the slightly more generic error message is acceptable. + if len(expTypes) == 1 && len(recTypes) == 1 { + for recType := range recTypes { + return fmt.Errorf("connector %q cannot connect from %s to %s: telemetry type is not supported", connID, expType, recType) + } + } + return fmt.Errorf("connector %q used as exporter in %s pipeline but not used in any supported receiver pipeline", connID, expType) + } + for recType, supportedUse := range recTypes { + if supportedUse { + continue + } + return fmt.Errorf("connector %q used as receiver in %s pipeline but not used in any supported exporter pipeline", connID, recType) + } + + for _, eID := range connectorsAsExporter[connID] { for _, rID := range connectorsAsReceiver[connID] { - if !set.ConnectorBuilder.SupportsConnection(connID.Type(), eID.Type(), rID.Type()) { + if connectorStability(connFactory, eID.Type(), rID.Type()) == component.StabilityLevelUndefined { + // Connector is not supported for this combination, but we know it is used correctly elsewhere continue } connNode := g.createConnector(eID, rID, connID) @@ -110,6 +168,7 @@ func (g *Graph) createNodes(set Settings) { } } } + return nil } func (g *Graph) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode { @@ -370,3 +429,36 @@ func cycleErr(err error, cycles [][]graph.Node) error { } return fmt.Errorf("cycle detected: %s", strings.Join(componentDetails, " -> ")) } + +func connectorStability(f connector.Factory, expType, recType component.Type) component.StabilityLevel { + switch expType { + case component.DataTypeTraces: + switch recType { + case component.DataTypeTraces: + return f.TracesToTracesStability() + case component.DataTypeMetrics: + return f.TracesToMetricsStability() + case component.DataTypeLogs: + return f.TracesToLogsStability() + } + case component.DataTypeMetrics: + switch recType { + case component.DataTypeTraces: + return f.MetricsToTracesStability() + case component.DataTypeMetrics: + return f.MetricsToMetricsStability() + case component.DataTypeLogs: + return f.MetricsToLogsStability() + } + case component.DataTypeLogs: + switch recType { + case component.DataTypeTraces: + return f.LogsToTracesStability() + case component.DataTypeMetrics: + return f.LogsToMetricsStability() + case component.DataTypeLogs: + return f.LogsToLogsStability() + } + } + return component.StabilityLevelUndefined +} diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 788152d4564..8b580225796 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -582,6 +582,36 @@ func TestConnectorPipelinesGraph(t *testing.T) { }, expectedPerExporter: 3, }, + { + name: "pipelines_conn_lanes.yaml", + pipelineConfigs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("examplereceiver")}, + Exporters: []component.ID{component.NewID("mockforward")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("mockforward")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("examplereceiver")}, + Exporters: []component.ID{component.NewID("mockforward")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewID("mockforward")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + component.NewIDWithName("logs", "in"): { + Receivers: []component.ID{component.NewID("examplereceiver")}, + Exporters: []component.ID{component.NewID("mockforward")}, + }, + component.NewIDWithName("logs", "out"): { + Receivers: []component.ID{component.NewID("mockforward")}, + Exporters: []component.ID{component.NewID("exampleexporter")}, + }, + }, + expectedPerExporter: 1, + }, } for _, test := range tests { @@ -622,9 +652,11 @@ func TestConnectorPipelinesGraph(t *testing.T) { component.NewID("exampleconnector"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), component.NewIDWithName("exampleconnector", "fork"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), component.NewIDWithName("exampleconnector", "merge"): testcomponents.ExampleConnectorFactory.CreateDefaultConfig(), + component.NewID("mockforward"): testcomponents.MockForwardConnectorFactory.CreateDefaultConfig(), }, map[component.Type]connector.Factory{ - testcomponents.ExampleConnectorFactory.Type(): testcomponents.ExampleConnectorFactory, + testcomponents.ExampleConnectorFactory.Type(): testcomponents.ExampleConnectorFactory, + testcomponents.MockForwardConnectorFactory.Type(): testcomponents.MockForwardConnectorFactory, }, ), PipelineConfigs: test.pipelineConfigs, @@ -1017,6 +1049,7 @@ func TestGraphBuildErrors(t *testing.T) { nopProcessorFactory := processortest.NewNopFactory() nopExporterFactory := exportertest.NewNopFactory() nopConnectorFactory := connectortest.NewNopFactory() + mfConnectorFactory := testcomponents.MockForwardConnectorFactory badReceiverFactory := newBadReceiverFactory() badProcessorFactory := newBadProcessorFactory() badExporterFactory := newBadExporterFactory() @@ -1187,6 +1220,305 @@ func TestGraphBuildErrors(t *testing.T) { }, expected: "failed to create \"bf\" receiver for data type \"traces\": telemetry type is not supported", }, + { + name: "not_supported_connector_traces_traces.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from traces to traces: telemetry type is not supported", + }, + { + name: "not_supported_connector_traces_metrics.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from traces to metrics: telemetry type is not supported", + }, + { + name: "not_supported_connector_traces_logs.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("logs", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from traces to logs: telemetry type is not supported", + }, + { + name: "not_supported_connector_metrics_traces.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from metrics to traces: telemetry type is not supported", + }, + { + name: "not_supported_connector_metrics_metrics.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from metrics to metrics: telemetry type is not supported", + }, + { + name: "not_supported_connector_metrics_logs.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("logs", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from metrics to logs: telemetry type is not supported", + }, + { + name: "not_supported_connector_logs_traces.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("logs", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from logs to traces: telemetry type is not supported", + }, + { + name: "not_supported_connector_logs_metrics.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("logs", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from logs to metrics: telemetry type is not supported", + }, + { + name: "not_supported_connector_logs_logs.yaml", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("bf"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("logs", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("bf")}, + }, + component.NewIDWithName("logs", "out"): { + Receivers: []component.ID{component.NewID("bf")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector \"bf\" cannot connect from logs to logs: telemetry type is not supported", + }, + { + name: "orphaned-connector-use-as-exporter", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewIDWithName("nop", "conn")}, + }, + }, + expected: `connector "nop/conn" used as exporter in metrics pipeline but not used in any supported receiver pipeline`, + }, + { + name: "orphaned-connector-use-as-receiver", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewIDWithName("nop", "conn"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewIDWithName("nop", "conn")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: `connector "nop/conn" used as receiver in traces pipeline but not used in any supported exporter pipeline`, + }, + { + name: "partially-orphaned-connector-use-as-exporter", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("mockforward"): mfConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("mockforward")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("mockforward")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("mockforward")}, + }, + }, + expected: `connector "mockforward" used as exporter in metrics pipeline but not used in any supported receiver pipeline`, + }, + { + name: "partially-orphaned-connector-use-as-receiver", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopExporterFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("mockforward"): mfConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("metrics", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("mockforward")}, + }, + component.NewIDWithName("metrics", "out"): { + Receivers: []component.ID{component.NewID("mockforward")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("mockforward")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: `connector "mockforward" used as receiver in traces pipeline but not used in any supported exporter pipeline`, + }, { name: "not_allowed_simple_cycle_traces.yaml", receiverCfgs: map[component.ID]component.Config{ @@ -1567,6 +1899,29 @@ func TestGraphBuildErrors(t *testing.T) { }, expected: "failed to create \"unknown\" receiver for data type \"logs\": receiver factory not available for: \"unknown\"", }, + { + name: "unknown_connector_factory", + receiverCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + exporterCfgs: map[component.ID]component.Config{ + component.NewID("nop"): nopReceiverFactory.CreateDefaultConfig(), + }, + connectorCfgs: map[component.ID]component.Config{ + component.NewID("unknown"): nopConnectorFactory.CreateDefaultConfig(), + }, + pipelineCfgs: pipelines.Config{ + component.NewIDWithName("traces", "in"): { + Receivers: []component.ID{component.NewID("nop")}, + Exporters: []component.ID{component.NewID("unknown")}, + }, + component.NewIDWithName("traces", "out"): { + Receivers: []component.ID{component.NewID("unknown")}, + Exporters: []component.ID{component.NewID("nop")}, + }, + }, + expected: "connector factory not available for: \"unknown\"", + }, } for _, test := range tests { @@ -1597,6 +1952,7 @@ func TestGraphBuildErrors(t *testing.T) { map[component.Type]connector.Factory{ nopConnectorFactory.Type(): nopConnectorFactory, badConnectorFactory.Type(): badConnectorFactory, + mfConnectorFactory.Type(): mfConnectorFactory, }), PipelineConfigs: test.pipelineCfgs, } diff --git a/service/internal/testcomponents/example_connector.go b/service/internal/testcomponents/example_connector.go index 4d1cc5ac6b6..ccf6d678d28 100644 --- a/service/internal/testcomponents/example_connector.go +++ b/service/internal/testcomponents/example_connector.go @@ -35,6 +35,14 @@ var ExampleConnectorFactory = connector.NewFactory( connector.WithLogsToLogs(createExampleLogsToLogs, component.StabilityLevelDevelopment), ) +var MockForwardConnectorFactory = connector.NewFactory( + "mockforward", + createExampleConnectorDefaultConfig, + connector.WithTracesToTraces(createExampleTracesToTraces, component.StabilityLevelDevelopment), + connector.WithMetricsToMetrics(createExampleMetricsToMetrics, component.StabilityLevelDevelopment), + connector.WithLogsToLogs(createExampleLogsToLogs, component.StabilityLevelDevelopment), +) + func createExampleConnectorDefaultConfig() component.Config { return &struct{}{} } From 48d34135917cf5e8f6ea5ade77b04faeeced94f8 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 5 Jul 2023 09:22:28 -0400 Subject: [PATCH 3/4] Revert unnecessary changes to connectortest --- connector/connectortest/connector.go | 43 ++++++++++++---------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/connector/connectortest/connector.go b/connector/connectortest/connector.go index 0b85178c7bc..137a353423c 100644 --- a/connector/connectortest/connector.go +++ b/connector/connectortest/connector.go @@ -26,62 +26,57 @@ func NewNopCreateSettings() connector.CreateSettings { type nopConfig struct{} // NewNopFactory returns a connector.Factory that constructs nop processors. -func NewNopFactory(opts ...connector.FactoryOption) connector.Factory { - if len(opts) == 0 { - opts = []connector.FactoryOption{ - connector.WithTracesToTraces(CreateTracesToTracesConnector, component.StabilityLevelDevelopment), - connector.WithTracesToMetrics(CreateTracesToMetricsConnector, component.StabilityLevelDevelopment), - connector.WithTracesToLogs(CreateTracesToLogsConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToTraces(CreateMetricsToTracesConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToMetrics(CreateMetricsToMetricsConnector, component.StabilityLevelDevelopment), - connector.WithMetricsToLogs(CreateMetricsToLogsConnector, component.StabilityLevelDevelopment), - connector.WithLogsToTraces(CreateLogsToTracesConnector, component.StabilityLevelDevelopment), - connector.WithLogsToMetrics(CreateLogsToMetricsConnector, component.StabilityLevelDevelopment), - connector.WithLogsToLogs(CreateLogsToLogsConnector, component.StabilityLevelDevelopment), - } - } +func NewNopFactory() connector.Factory { return connector.NewFactory( "nop", func() component.Config { return &nopConfig{} }, - opts..., + connector.WithTracesToTraces(createTracesToTracesConnector, component.StabilityLevelDevelopment), + connector.WithTracesToMetrics(createTracesToMetricsConnector, component.StabilityLevelDevelopment), + connector.WithTracesToLogs(createTracesToLogsConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToTraces(createMetricsToTracesConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToMetrics(createMetricsToMetricsConnector, component.StabilityLevelDevelopment), + connector.WithMetricsToLogs(createMetricsToLogsConnector, component.StabilityLevelDevelopment), + connector.WithLogsToTraces(createLogsToTracesConnector, component.StabilityLevelDevelopment), + connector.WithLogsToMetrics(createLogsToMetricsConnector, component.StabilityLevelDevelopment), + connector.WithLogsToLogs(createLogsToLogsConnector, component.StabilityLevelDevelopment), ) } -func CreateTracesToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) { +func createTracesToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Traces, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateTracesToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) { +func createTracesToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Traces, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateTracesToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) { +func createTracesToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Traces, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateMetricsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) { +func createMetricsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Metrics, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateMetricsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) { +func createMetricsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Metrics, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateMetricsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) { +func createMetricsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Metrics, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateLogsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) { +func createLogsToTracesConnector(context.Context, connector.CreateSettings, component.Config, consumer.Traces) (connector.Logs, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateLogsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) { +func createLogsToMetricsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Metrics) (connector.Logs, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } -func CreateLogsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) { +func createLogsToLogsConnector(context.Context, connector.CreateSettings, component.Config, consumer.Logs) (connector.Logs, error) { return &nopConnector{Consumer: consumertest.NewNop()}, nil } From 9fc673b7abce1554eae7114586812a779959e8cf Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 5 Jul 2023 09:26:16 -0400 Subject: [PATCH 4/4] Remove special case error message and update tests accordingly --- service/internal/graph/graph.go | 7 ------- service/internal/graph/graph_test.go | 18 +++++++++--------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index d57a0b911d7..5b8a404d66f 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -140,13 +140,6 @@ func (g *Graph) createNodes(set Settings) error { if supportedUse { continue } - // Retain previous error message for special case where connector is used once, but incorrectly. - // TODO remove this case if we agree the slightly more generic error message is acceptable. - if len(expTypes) == 1 && len(recTypes) == 1 { - for recType := range recTypes { - return fmt.Errorf("connector %q cannot connect from %s to %s: telemetry type is not supported", connID, expType, recType) - } - } return fmt.Errorf("connector %q used as exporter in %s pipeline but not used in any supported receiver pipeline", connID, expType) } for recType, supportedUse := range recTypes { diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 8b580225796..1279ff145d5 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -1241,7 +1241,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from traces to traces: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in traces pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_traces_metrics.yaml", @@ -1264,7 +1264,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from traces to metrics: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in traces pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_traces_logs.yaml", @@ -1287,7 +1287,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from traces to logs: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in traces pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_metrics_traces.yaml", @@ -1310,7 +1310,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from metrics to traces: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in metrics pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_metrics_metrics.yaml", @@ -1333,7 +1333,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from metrics to metrics: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in metrics pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_metrics_logs.yaml", @@ -1356,7 +1356,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from metrics to logs: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in metrics pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_logs_traces.yaml", @@ -1379,7 +1379,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from logs to traces: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in logs pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_logs_metrics.yaml", @@ -1402,7 +1402,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from logs to metrics: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in logs pipeline but not used in any supported receiver pipeline", }, { name: "not_supported_connector_logs_logs.yaml", @@ -1425,7 +1425,7 @@ func TestGraphBuildErrors(t *testing.T) { Exporters: []component.ID{component.NewID("nop")}, }, }, - expected: "connector \"bf\" cannot connect from logs to logs: telemetry type is not supported", + expected: "connector \"bf\" used as exporter in logs pipeline but not used in any supported receiver pipeline", }, { name: "orphaned-connector-use-as-exporter",