Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created dry run flag for collector #6445

16 changes: 16 additions & 0 deletions .chloggen/add-dry-run-flag-validate-all-fields.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added dry run flag to validate config file without running collector.

# One or more tracking issues or pull requests related to the change
issues: [6445]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
91 changes: 91 additions & 0 deletions config/moved_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,97 @@ func (cfg *Config) validateService() error {
}
return nil
}
func (cfg *Config) DryRunValidate() {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each one of the conditions needs its test to ensure its correctness.

fmt.Printf("**..%v\n", errMissingReceivers)
} else {
for recvID, recvCfg := range cfg.Receivers {
if err := recvCfg.Validate(); err != nil {
fmt.Printf("**..receiver %q has invalid configuration: %v\n", recvID, err)
}
}
}

// Currently, there is no default exporter enabled.
// The configuration must specify at least one exporter to be valid.
if len(cfg.Exporters) == 0 {
fmt.Printf("**..%v\n", errMissingExporters)
} else {
for expID, expCfg := range cfg.Exporters {
if err := expCfg.Validate(); err != nil {
fmt.Printf("**..exporter %q has invalid configuration: %v\n", expID, err)
}
}
}

// Validate the processor configuration.
for procID, procCfg := range cfg.Processors {
if err := procCfg.Validate(); err != nil {
fmt.Printf("**..processor %q has invalid configuration: %v\n", procID, err)
}
}

// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := extCfg.Validate(); err != nil {
fmt.Printf("**..extension %q has invalid configuration: %v\n", extID, err)
}
}
cfg.dryRunValidateService()
}

func (cfg *Config) dryRunValidateService() {

for _, ref := range cfg.Service.Extensions {
// Check that the name referenced in the Service extensions exists in the top-level extensions.
if cfg.Extensions[ref] == nil {
fmt.Printf("**..service references extension %q which does not exist\n", ref)
}
}

// Must have at least one pipeline.
if len(cfg.Service.Pipelines) == 0 {
fmt.Printf("**..%v\n", errMissingServicePipelines)
} else {
for pipelineID, pipeline := range cfg.Service.Pipelines {
if pipelineID.Type() != TracesDataType && pipelineID.Type() != MetricsDataType && pipelineID.Type() != LogsDataType {
fmt.Printf("**..unknown pipeline datatype %q for %v\n", pipelineID.Type(), pipelineID)
}

// Validate pipeline has at least one receiver.
if len(pipeline.Receivers) == 0 {
fmt.Printf("**..pipeline %q must have at least one receiver\n", pipelineID)
} else {
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
fmt.Printf("**..pipeline %q references receiver %q which does not exist\n", pipelineID, ref)
}
}
}
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
fmt.Printf("**..pipeline %q references processor %q which does not exist\n", pipelineID, ref)
}
}

// Validate pipeline has at least one exporter.
if len(pipeline.Exporters) == 0 {
fmt.Printf("**..pipeline %q must have at least one exporter\n", pipelineID)
} else {
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
fmt.Printf("**..pipeline %q references exporter %q which does not exist\n", pipelineID, ref)
}
}
}
}
}
}

// Service defines the configurable components of the service.
// Deprecated: [v0.52.0] Use service.ConfigService
Expand Down
8 changes: 7 additions & 1 deletion service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,10 @@ key:

1. Does not support setting a key that contains a dot `.`.
2. Does not support setting a key that contains a equal sign `=`.
3. The configuration key separator inside the value part of the property is "::". For example `--set "name={a::b: c}"` is equivalent with `--set name.a.b=c`.
3. The configuration key separator inside the value part of the property is "::". For example `--set "name={a::b: c}"` is equivalent with `--set name.a.b=c`.

## How to validate configuration file and return all errors without running collector

```bash
./otelcorecol --config=file:examples/local/otel-config.yaml --dry-run
```
17 changes: 17 additions & 0 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os/signal"
"syscall"

"go.opentelemetry.io/collector/service/internal/configunmarshaler"

"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -157,6 +159,21 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

// Run starts the collector according to the given configuration, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.

func (col *Collector) DryRun(ctx context.Context) error {
_, err, errorcheck := col.set.ConfigProvider.DryRunGet(ctx, col.set.Factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}

if errorcheck {
ExtensionValidateErrors, ExportersValidateErrors, ReceiversValidateErrors, ProcessorsValidateErrors := configunmarshaler.ReturnValidateErrors()
printMarshalErrors(ExtensionValidateErrors, ExportersValidateErrors, ReceiversValidateErrors, ProcessorsValidateErrors)
return nil
}
return nil
}

func (col *Collector) Run(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(Closed)
Expand Down
3 changes: 3 additions & 0 deletions service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func NewCommand(set CollectorSettings) *cobra.Command {
if err != nil {
return err
}
if dryRun {
return col.DryRun(cmd.Context())
}
return col.Run(cmd.Context())
},
}
Expand Down
20 changes: 20 additions & 0 deletions service/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type ConfigProvider interface {
//
// Should never be called concurrently with itself or Get.
Shutdown(ctx context.Context) error
// DryRunGet validates the configuration, prints out all related errors without running the collector
DryRunGet(ctx context.Context, factories component.Factories) (*Config, error, bool)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "dry run" be a flag or a command?

/cc @codeboten @jpkrohling who also reviewed #6322

Copy link
Contributor

@codeboten codeboten Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would expect it to be a flag

Update: after looking at what helm does, i think it should just be a command. I would rename dry-run to verify or validate since that's what we're really asking the command to do here.

dry-run only really applies to the context of a command that would change a state somewhere.

}

type configProvider struct {
Expand Down Expand Up @@ -116,6 +118,24 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories

return cfg, nil
}
func (cm *configProvider) DryRunGet(ctx context.Context, factories component.Factories) (*Config, error, bool) {
retMap, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err), false
}

var cfg *Config
cfg, err, errorcheck := configunmarshaler.New().DryRunUnmarshal(retMap, factories)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err), false
}
if errorcheck {
return nil, nil, errorcheck
}

cfg.DryRunValidate()
return nil, nil, false
}

func (cm *configProvider) Watch() <-chan error {
return cm.mapResolver.Watch()
Expand Down
36 changes: 36 additions & 0 deletions service/dry_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package service

import (
"fmt"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
)

func printMarshalErrors(exterrs []configunmarshaler.ExtensionValidateError, experrs []configunmarshaler.ExporterValidateError, recerrs []configunmarshaler.ReceiverValidateError, procerrs []configunmarshaler.ProcessorValidateError) {
count := 1
for _, exterr := range exterrs {
validateErrorText(count, exterr.Component, exterr.Id, exterr.Err)
count++
}
for _, experr := range experrs {
validateErrorText(count, experr.Component, experr.Id, experr.Err)
count++
}
for _, recerr := range recerrs {
validateErrorText(count, recerr.Component, recerr.Id, recerr.Err)
count++
}
for _, procerr := range procerrs {
validateErrorText(count, procerr.Component, procerr.Id, procerr.Err)
count++
}
}

func validateErrorText(count int, idType string, id config.ComponentID, err string) {
fmt.Printf("Error %d\n", count)
fmt.Printf("=============\n")
fmt.Printf("%s:\n", idType)
fmt.Printf(" %s\n", id)
fmt.Printf(" ^---^--- %s\n", err)
}
4 changes: 4 additions & 0 deletions service/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
const (
configFlag = "config"
featureGatesFlag = "feature-gates"
dryRunFlag = "dry-run"
)

var dryRun bool

type configFlagValue struct {
values []string
sets []string
Expand Down Expand Up @@ -64,6 +67,7 @@ func flags() *flag.FlagSet {
flagSet.Var(featuregate.FlagValue{}, featureGatesFlag,
"Comma-delimited list of feature gate identifiers. Prefix with '-' to disable the feature. '+' or no prefix will enable the feature.")

flagSet.BoolVar(&dryRun, dryRunFlag, false, "Validate configuration without running the calculator, must be used with the config flag!")
return flagSet
}

Expand Down
43 changes: 22 additions & 21 deletions service/internal/configunmarshaler/defaultunmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,32 @@ func New() ConfigUnmarshaler {
return ConfigUnmarshaler{}
}

var rawCfg = configSettings{
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
Service: config.Service{
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]interface{}(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: ":8888",
},
},
},
}

// Unmarshal the config.Config from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (ConfigUnmarshaler) Unmarshal(v *confmap.Conf, factories component.Factories) (*config.Config, error) {
// Unmarshal top level sections and validate.
rawCfg := configSettings{
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
Service: config.Service{
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]interface{}(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: ":8888",
},
},
},
}
if err := v.Unmarshal(&rawCfg, confmap.WithErrorUnused()); err != nil {
return nil, configError{
error: fmt.Errorf("error reading top level configuration sections: %w", err),
Expand Down
Loading