Skip to content

Commit

Permalink
Created dry run for collector, fixes open-telemetry#4613
Browse files Browse the repository at this point in the history
Signed-off-by: Maureen <[email protected]>
  • Loading branch information
Chinwendu20 committed Oct 30, 2022
1 parent 7fa47b4 commit dd53f75
Show file tree
Hide file tree
Showing 8 changed files with 426 additions and 21 deletions.
91 changes: 91 additions & 0 deletions config/moved_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,97 @@ func (cfg *Config) validateService() error {
}
return nil
}
func (cfg *Config) DryRunValidate() {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
fmt.Printf("**..%v\n", errMissingReceivers)
} else {
for recvID, recvCfg := range cfg.Receivers {
if err := recvCfg.Validate(); err != nil {
fmt.Printf("**..receiver %q has invalid configuration: %v\n", recvID, err)
}
}
}

// Currently, there is no default exporter enabled.
// The configuration must specify at least one exporter to be valid.
if len(cfg.Exporters) == 0 {
fmt.Printf("**..%v\n", errMissingExporters)
} else {
for expID, expCfg := range cfg.Exporters {
if err := expCfg.Validate(); err != nil {
fmt.Printf("**..exporter %q has invalid configuration: %v\n", expID, err)
}
}
}

// Validate the processor configuration.
for procID, procCfg := range cfg.Processors {
if err := procCfg.Validate(); err != nil {
fmt.Printf("**..processor %q has invalid configuration: %v\n", procID, err)
}
}

// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := extCfg.Validate(); err != nil {
fmt.Printf("**..extension %q has invalid configuration: %v\n", extID, err)
}
}
cfg.dryRunValidateService()
}

func (cfg *Config) dryRunValidateService() {

for _, ref := range cfg.Service.Extensions {
// Check that the name referenced in the Service extensions exists in the top-level extensions.
if cfg.Extensions[ref] == nil {
fmt.Printf("**..service references extension %q which does not exist\n", ref)
}
}

// Must have at least one pipeline.
if len(cfg.Service.Pipelines) == 0 {
fmt.Printf("**..%v\n", errMissingServicePipelines)
} else {
for pipelineID, pipeline := range cfg.Service.Pipelines {
if pipelineID.Type() != TracesDataType && pipelineID.Type() != MetricsDataType && pipelineID.Type() != LogsDataType {
fmt.Printf("**..unknown pipeline datatype %q for %v\n", pipelineID.Type(), pipelineID)
}

// Validate pipeline has at least one receiver.
if len(pipeline.Receivers) == 0 {
fmt.Printf("**..pipeline %q must have at least one receiver\n", pipelineID)
} else {
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
fmt.Printf("**..pipeline %q references receiver %q which does not exist\n", pipelineID, ref)
}
}
}
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
fmt.Printf("**..pipeline %q references processor %q which does not exist\n", pipelineID, ref)
}
}

// Validate pipeline has at least one exporter.
if len(pipeline.Exporters) == 0 {
fmt.Printf("**..pipeline %q must have at least one exporter\n", pipelineID)
} else {
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
fmt.Printf("**..pipeline %q references exporter %q which does not exist\n", pipelineID, ref)
}
}
}
}
}
}

// Service defines the configurable components of the service.
// Deprecated: [v0.52.0] Use service.ConfigService
Expand Down
17 changes: 17 additions & 0 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"os/signal"
"syscall"

"go.opentelemetry.io/collector/service/internal/configunmarshaler"

"go.uber.org/atomic"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -157,6 +159,21 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {

// Run starts the collector according to the given configuration, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.

func (col *Collector) DryRun(ctx context.Context) error {
_, err, errorcheck := col.set.ConfigProvider.DryRunGet(ctx, col.set.Factories)
if err != nil {
return fmt.Errorf("failed to get config: %w", err)
}

if errorcheck {
ExtensionValidateErrors, ExportersValidateErrors, ReceiversValidateErrors, ProcessorsValidateErrors := configunmarshaler.ReturnValidateErrors()
printMarshalErrors(ExtensionValidateErrors, ExportersValidateErrors, ReceiversValidateErrors, ProcessorsValidateErrors)
return nil
}
return nil
}

func (col *Collector) Run(ctx context.Context) error {
if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(Closed)
Expand Down
3 changes: 3 additions & 0 deletions service/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func NewCommand(set CollectorSettings) *cobra.Command {
if err != nil {
return err
}
if dryRun {
return col.DryRun(cmd.Context())
}
return col.Run(cmd.Context())
},
}
Expand Down
20 changes: 20 additions & 0 deletions service/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type ConfigProvider interface {
//
// Should never be called concurrently with itself or Get.
Shutdown(ctx context.Context) error
// DryRunGet validates the configuration, prints out all related errors without running the collector
DryRunGet(ctx context.Context, factories component.Factories) (*Config, error, bool)
}

type configProvider struct {
Expand Down Expand Up @@ -116,6 +118,24 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories

return cfg, nil
}
func (cm *configProvider) DryRunGet(ctx context.Context, factories component.Factories) (*Config, error, bool) {
retMap, err := cm.mapResolver.Resolve(ctx)
if err != nil {
return nil, fmt.Errorf("cannot resolve the configuration: %w", err), false
}

var cfg *Config
cfg, err, errorcheck := configunmarshaler.New().DryRunUnmarshal(retMap, factories)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal the configuration: %w", err), false
}
if errorcheck {
return nil, nil, errorcheck
}

cfg.DryRunValidate()
return nil, nil, false
}

func (cm *configProvider) Watch() <-chan error {
return cm.mapResolver.Watch()
Expand Down
36 changes: 36 additions & 0 deletions service/dry_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package service

import (
"fmt"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal/configunmarshaler"
)

func printMarshalErrors(exterrs []configunmarshaler.ExtensionValidateError, experrs []configunmarshaler.ExporterValidateError, recerrs []configunmarshaler.ReceiverValidateError, procerrs []configunmarshaler.ProcessorValidateError) {
count := 1
for _, exterr := range exterrs {
validateErrorText(count, exterr.Component, exterr.Id, exterr.Err)
count++
}
for _, experr := range experrs {
validateErrorText(count, experr.Component, experr.Id, experr.Err)
count++
}
for _, recerr := range recerrs {
validateErrorText(count, recerr.Component, recerr.Id, recerr.Err)
count++
}
for _, procerr := range procerrs {
validateErrorText(count, procerr.Component, procerr.Id, procerr.Err)
count++
}
}

func validateErrorText(count int, idType string, id config.ComponentID, err string) {
fmt.Printf("Error %d\n", count)
fmt.Printf("=============\n")
fmt.Printf("%s:\n", idType)
fmt.Printf(" %s\n", id)
fmt.Printf(" ^---^--- %s\n", err)
}
4 changes: 4 additions & 0 deletions service/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import (
const (
configFlag = "config"
featureGatesFlag = "feature-gates"
dryRunFlag = "dry-run"
)

var dryRun bool

type configFlagValue struct {
values []string
sets []string
Expand Down Expand Up @@ -64,6 +67,7 @@ func flags() *flag.FlagSet {
flagSet.Var(featuregate.FlagValue{}, featureGatesFlag,
"Comma-delimited list of feature gate identifiers. Prefix with '-' to disable the feature. '+' or no prefix will enable the feature.")

flagSet.BoolVar(&dryRun, dryRunFlag, false, "Validate configuration without running the calculator, must be used with the config flag!")
return flagSet
}

Expand Down
43 changes: 22 additions & 21 deletions service/internal/configunmarshaler/defaultunmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,32 @@ func New() ConfigUnmarshaler {
return ConfigUnmarshaler{}
}

var rawCfg = configSettings{
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
Service: config.Service{
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]interface{}(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: ":8888",
},
},
},
}

// Unmarshal the config.Config from a confmap.Conf.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (ConfigUnmarshaler) Unmarshal(v *confmap.Conf, factories component.Factories) (*config.Config, error) {
// Unmarshal top level sections and validate.
rawCfg := configSettings{
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
Service: config.Service{
Telemetry: telemetry.Config{
Logs: telemetry.LogsConfig{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
DisableCaller: false,
DisableStacktrace: false,
InitialFields: map[string]interface{}(nil),
},
Metrics: telemetry.MetricsConfig{
Level: configtelemetry.LevelBasic,
Address: ":8888",
},
},
},
}
if err := v.Unmarshal(&rawCfg, confmap.WithErrorUnused()); err != nil {
return nil, configError{
error: fmt.Errorf("error reading top level configuration sections: %w", err),
Expand Down
Loading

0 comments on commit dd53f75

Please sign in to comment.