Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connector validation based on usage in pipelines #8004

Merged
merged 4 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .chloggen/connector-validation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: connector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix connector validation

# One or more tracking issues or pull requests related to the change
issues: [7892]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Validation of connectors was too aggressive such that a connector that was used in any combination of unsupported roles would fail.
Instead, validation should pass as long as each use of the connector has a supported corresponding use.
19 changes: 0 additions & 19 deletions otelcol/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ func (cfg *Config) Validate() error {
}
}

// Keep track of whether connectors are used as receivers and exporters
connectorsAsReceivers := make(map[component.ID]struct{}, len(cfg.Connectors))
connectorsAsExporters := make(map[component.ID]struct{}, len(cfg.Connectors))

// Check that all pipelines reference only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// Validate pipeline receiver name references.
Expand All @@ -122,7 +118,6 @@ func (cfg *Config) Validate() error {
}

if _, ok := cfg.Connectors[ref]; ok {
connectorsAsReceivers[ref] = struct{}{}
continue
}
return fmt.Errorf("service::pipelines::%s: references receiver %q which is not configured", pipelineID, ref)
Expand All @@ -143,24 +138,10 @@ func (cfg *Config) Validate() error {
continue
}
if _, ok := cfg.Connectors[ref]; ok {
connectorsAsExporters[ref] = struct{}{}
continue
}
return fmt.Errorf("service::pipelines::%s: references exporter %q which is not configured", pipelineID, ref)
}
}

// Validate that connectors are used as both receiver and exporter
for connID := range cfg.Connectors {
_, recOK := connectorsAsReceivers[connID]
_, expOK := connectorsAsExporters[connID]
if recOK && !expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as exporter", connID)
}
if !recOK && expOK {
return fmt.Errorf("connectors::%s: must be used as both receiver and exporter but is not used as receiver", connID)
}
}

return nil
}
20 changes: 0 additions & 20 deletions otelcol/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,6 @@ func TestConfigValidate(t *testing.T) {
},
expected: errors.New(`service::pipelines::traces: references exporter "nop/conn2" which is not configured`),
},
{
name: "missing-connector-as-receiver",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Exporters = append(pipe.Exporters, component.NewIDWithName("nop", "conn"))
return cfg
},
expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as receiver`),
},
{
name: "missing-connector-as-exporter",
cfgFn: func() *Config {
cfg := generateConfig()
pipe := cfg.Service.Pipelines[component.NewID("traces")]
pipe.Receivers = append(pipe.Receivers, component.NewIDWithName("nop", "conn"))
return cfg
},
expected: errors.New(`connectors::nop/conn: must be used as both receiver and exporter but is not used as exporter`),
},
{
name: "invalid-service-config",
cfgFn: func() *Config {
Expand Down
96 changes: 92 additions & 4 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ func Build(ctx context.Context, set Settings) (*Graph, error) {
exporters: make(map[int64]graph.Node),
}
}
pipelines.createNodes(set)
if err := pipelines.createNodes(set); err != nil {
return nil, err
}
pipelines.createEdges()
return pipelines, pipelines.buildComponents(ctx, set)
}

// Creates a node for each instance of a component and adds it to the graph
func (g *Graph) createNodes(set Settings) {
func (g *Graph) createNodes(set Settings) error {
// Build a list of all connectors for easy reference
connectors := make(map[component.ID]struct{})

// Keep track of connectors and where they are used. (map[connectorID][]pipelineID)
connectorsAsExporter := make(map[component.ID][]component.ID)
connectorsAsReceiver := make(map[component.ID][]component.ID)
Expand All @@ -73,6 +78,7 @@ func (g *Graph) createNodes(set Settings) {
pipe := g.pipelines[pipelineID]
for _, recvID := range pipelineCfg.Receivers {
if set.ConnectorBuilder.IsConfigured(recvID) {
connectors[recvID] = struct{}{}
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
continue
}
Expand All @@ -90,6 +96,7 @@ func (g *Graph) createNodes(set Settings) {

for _, exprID := range pipelineCfg.Exporters {
if set.ConnectorBuilder.IsConfigured(exprID) {
connectors[exprID] = struct{}{}
connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID)
continue
}
Expand All @@ -98,15 +105,63 @@ func (g *Graph) createNodes(set Settings) {
}
}

for connID, exprPipelineIDs := range connectorsAsExporter {
for _, eID := range exprPipelineIDs {
for connID := range connectors {
factory := set.ConnectorBuilder.Factory(connID.Type())
if factory == nil {
return fmt.Errorf("connector factory not available for: %q", connID.Type())
}
connFactory := factory.(connector.Factory)

expTypes := make(map[component.DataType]bool)
for _, pipelineID := range connectorsAsExporter[connID] {
// The presence of each key indicates how the connector is used as an exporter.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as a receiver.
expTypes[pipelineID.Type()] = false
}
recTypes := make(map[component.DataType]bool)
for _, pipelineID := range connectorsAsReceiver[connID] {
// The presence of each key indicates how the connector is used as a receiver.
// The value is initially set to false. Later we will set the value to true *if* we
// confirm that there is a supported corresponding use as an exporter.
recTypes[pipelineID.Type()] = false
}

for expType := range expTypes {
for recType := range recTypes {
if connectorStability(connFactory, expType, recType) != component.StabilityLevelUndefined {
expTypes[expType] = true
recTypes[recType] = true
}
}
}

for expType, supportedUse := range expTypes {
if supportedUse {
continue
}
return fmt.Errorf("connector %q used as exporter in %s pipeline but not used in any supported receiver pipeline", connID, expType)
}
for recType, supportedUse := range recTypes {
if supportedUse {
continue
}
return fmt.Errorf("connector %q used as receiver in %s pipeline but not used in any supported exporter pipeline", connID, recType)
}

for _, eID := range connectorsAsExporter[connID] {
for _, rID := range connectorsAsReceiver[connID] {
if connectorStability(connFactory, eID.Type(), rID.Type()) == component.StabilityLevelUndefined {
// Connector is not supported for this combination, but we know it is used correctly elsewhere
continue
}
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
}
}
}
return nil
}

func (g *Graph) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode {
Expand Down Expand Up @@ -367,3 +422,36 @@ func cycleErr(err error, cycles [][]graph.Node) error {
}
return fmt.Errorf("cycle detected: %s", strings.Join(componentDetails, " -> "))
}

func connectorStability(f connector.Factory, expType, recType component.Type) component.StabilityLevel {
switch expType {
case component.DataTypeTraces:
switch recType {
case component.DataTypeTraces:
return f.TracesToTracesStability()
case component.DataTypeMetrics:
return f.TracesToMetricsStability()
case component.DataTypeLogs:
return f.TracesToLogsStability()
}
case component.DataTypeMetrics:
switch recType {
case component.DataTypeTraces:
return f.MetricsToTracesStability()
case component.DataTypeMetrics:
return f.MetricsToMetricsStability()
case component.DataTypeLogs:
return f.MetricsToLogsStability()
}
case component.DataTypeLogs:
switch recType {
case component.DataTypeTraces:
return f.LogsToTracesStability()
case component.DataTypeMetrics:
return f.LogsToMetricsStability()
case component.DataTypeLogs:
return f.LogsToLogsStability()
}
}
return component.StabilityLevelUndefined
}
Loading