Skip to content

Commit

Permalink
Move config.Config and config.Service to serviceconfig
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Dec 22, 2021
1 parent 7de98eb commit edb2a4f
Show file tree
Hide file tree
Showing 24 changed files with 308 additions and 271 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Remove deprecated consumererror.Combine (#4597)
- Remove `configmapprovider.NewDefault`, `configmapprovider.NewExpand`, `configmapprovider.NewMerge` (#4600)
- Move `configtest.LoadConfig` and `configtest.LoadConfigAndValidate` to `servicetest` (#4606)
- Move `config.Config` and `config.Service` to `serviceconfig` (#4608)

## 💡 Enhancements 💡

Expand Down
150 changes: 14 additions & 136 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,146 +14,24 @@

package config // import "go.opentelemetry.io/collector/config"

import (
"errors"
"fmt"
)

var (
errMissingExporters = errors.New("no enabled exporters specified in config")
errMissingReceivers = errors.New("no enabled receivers specified in config")
errMissingServicePipelines = errors.New("service must have at least one pipeline")
)

// Config defines the configuration for the various elements of collector or agent.
type Config struct {
// Receivers is a map of ComponentID to Receivers.
Receivers map[ComponentID]Receiver

// Exporters is a map of ComponentID to Exporters.
Exporters map[ComponentID]Exporter

// Processors is a map of ComponentID to Processors.
Processors map[ComponentID]Processor

// Extensions is a map of ComponentID to extensions.
Extensions map[ComponentID]Extension

Service
}

var _ validatable = (*Config)(nil)

// Validate returns an error if the config is invalid.
//
// This function performs basic validation of configuration. There may be more subtle
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
func (cfg *Config) Validate() error {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
return errMissingReceivers
}

// Validate the receiver configuration.
for recvID, recvCfg := range cfg.Receivers {
if err := recvCfg.Validate(); err != nil {
return fmt.Errorf("receiver %q has invalid configuration: %w", recvID, err)
}
}

// Currently, there is no default exporter enabled.
// The configuration must specify at least one exporter to be valid.
if len(cfg.Exporters) == 0 {
return errMissingExporters
}

// Validate the exporter configuration.
for expID, expCfg := range cfg.Exporters {
if err := expCfg.Validate(); err != nil {
return fmt.Errorf("exporter %q has invalid configuration: %w", expID, err)
}
}

// Validate the processor configuration.
for procID, procCfg := range cfg.Processors {
if err := procCfg.Validate(); err != nil {
return fmt.Errorf("processor %q has invalid configuration: %w", procID, err)
}
}

// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := extCfg.Validate(); err != nil {
return fmt.Errorf("extension %q has invalid configuration: %w", extID, err)
}
}

return cfg.validateService()
}

func (cfg *Config) validateService() error {
// Validate Telemetry
if err := cfg.Service.Telemetry.validate(); err != nil {
return err
}

// Check that all enabled extensions in the service are configured.
for _, ref := range cfg.Service.Extensions {
// Check that the name referenced in the Service extensions exists in the top-level extensions.
if cfg.Extensions[ref] == nil {
return fmt.Errorf("service references extension %q which does not exist", ref)
}
}

// Must have at least one pipeline.
if len(cfg.Service.Pipelines) == 0 {
return errMissingServicePipelines
}

// Check that all pipelines have at least one receiver and one exporter, and they reference
// only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// Validate pipeline has at least one receiver.
if len(pipeline.Receivers) == 0 {
return fmt.Errorf("pipeline %q must have at least one receiver", pipelineID)
}

// Validate pipeline receiver name references.
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
return fmt.Errorf("pipeline %q references receiver %q which does not exist", pipelineID, ref)
}
}
// Type is the component type as it is used in the config.
type Type string

// Validate pipeline processor name references.
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
return fmt.Errorf("pipeline %q references processor %q which does not exist", pipelineID, ref)
}
}
// DataType is a special Type that represents the data types supported by the collector. We currently support
// collecting metrics, traces and logs, this can expand in the future.
type DataType = Type

// Validate pipeline has at least one exporter.
if len(pipeline.Exporters) == 0 {
return fmt.Errorf("pipeline %q must have at least one exporter", pipelineID)
}
// Currently supported data types. Add new data types here when new types are supported in the future.
const (
// TracesDataType is the data type tag for traces.
TracesDataType DataType = "traces"

// Validate pipeline exporter name references.
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
return fmt.Errorf("pipeline %q references exporter %q which does not exist", pipelineID, ref)
}
}
}
return nil
}
// MetricsDataType is the data type tag for metrics.
MetricsDataType DataType = "metrics"

// Type is the component type as it is used in the config.
type Type string
// LogsDataType is the data type tag for logs.
LogsDataType DataType = "logs"
)

// validatable defines the interface for the configuration validation.
type validatable interface {
Expand Down
5 changes: 3 additions & 2 deletions config/configtest/configtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,20 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmapprovider"
"go.opentelemetry.io/collector/service/serviceconfig"
"go.opentelemetry.io/collector/service/servicetest"
)

// The regular expression for valid config field tag.
var configFieldTagRegExp = regexp.MustCompile("^[a-z0-9][a-z0-9_]*$")

// Deprecated: use servicetest.LoadConfig
func LoadConfig(fileName string, factories component.Factories) (*config.Config, error) {
func LoadConfig(fileName string, factories component.Factories) (*serviceconfig.Config, error) {
return servicetest.LoadConfig(fileName, factories)
}

// Deprecated: use servicetest.LoadConfigAndValidate
func LoadConfigAndValidate(fileName string, factories component.Factories) (*config.Config, error) {
func LoadConfigAndValidate(fileName string, factories component.Factories) (*serviceconfig.Config, error) {
return servicetest.LoadConfigAndValidate(fileName, factories)
}

Expand Down
17 changes: 9 additions & 8 deletions config/configunmarshaler/defaultunmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/serviceconfig"
)

// These are errors that can be returned by Unmarshal(). Note that error codes are not part
Expand Down Expand Up @@ -84,8 +85,8 @@ func NewDefault() ConfigUnmarshaler {

// Unmarshal the Config from a config.Map.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (*defaultUnmarshaler) Unmarshal(v *config.Map, factories component.Factories) (*config.Config, error) {
var cfg config.Config
func (*defaultUnmarshaler) Unmarshal(v *config.Map, factories component.Factories) (*serviceconfig.Config, error) {
var cfg serviceconfig.Config

// Unmarshal top level sections and validate.
rawCfg := configSettings{}
Expand Down Expand Up @@ -163,12 +164,12 @@ func unmarshalExtensions(exts map[config.ComponentID]map[string]interface{}, fac
return extensions, nil
}

func unmarshalService(srvRaw map[string]interface{}) (config.Service, error) {
func unmarshalService(srvRaw map[string]interface{}) (serviceconfig.Service, error) {
// Setup default telemetry values as in service/logger.go.
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
srv := config.Service{
Telemetry: config.ServiceTelemetry{
Logs: config.ServiceTelemetryLogs{
srv := serviceconfig.Service{
Telemetry: serviceconfig.ServiceTelemetry{
Logs: serviceconfig.ServiceTelemetryLogs{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Expand All @@ -194,7 +195,7 @@ func unmarshalService(srvRaw map[string]interface{}) (config.Service, error) {
return srv, nil
}

func defaultServiceTelemetryMetricsSettings() config.ServiceTelemetryMetrics {
func defaultServiceTelemetryMetricsSettings() serviceconfig.ServiceTelemetryMetrics {
// These deprecated functions are still needed here so that the values provided through the CLI flags
// can be used as a baseline if no values are provided in configuration. This will eventually return
// a static default configuration when the CLI flags are removed.
Expand All @@ -203,7 +204,7 @@ func defaultServiceTelemetryMetricsSettings() config.ServiceTelemetryMetrics {
addr = configtelemetry.GetMetricsAddrDefault() //nolint:staticcheck
}

return config.ServiceTelemetryMetrics{
return serviceconfig.ServiceTelemetryMetrics{
Level: configtelemetry.GetMetricsLevelFlagValue(), //nolint:staticcheck
Address: addr,
}
Expand Down
13 changes: 7 additions & 6 deletions config/configunmarshaler/defaultunmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/testcomponents"
"go.opentelemetry.io/collector/service/serviceconfig"
)

func TestDecodeConfig(t *testing.T) {
Expand Down Expand Up @@ -101,8 +102,8 @@ func TestDecodeConfig(t *testing.T) {

// Verify Service Telemetry
assert.Equal(t,
config.ServiceTelemetry{
Logs: config.ServiceTelemetryLogs{
serviceconfig.ServiceTelemetry{
Logs: serviceconfig.ServiceTelemetryLogs{
Level: zapcore.DebugLevel,
Development: true,
Encoding: "console",
Expand All @@ -112,7 +113,7 @@ func TestDecodeConfig(t *testing.T) {
ErrorOutputPaths: []string{"stderr", "./error-output-logs"},
InitialFields: map[string]interface{}{"field_key": "filed_value"},
},
Metrics: config.ServiceTelemetryMetrics{
Metrics: serviceconfig.ServiceTelemetryMetrics{
Level: configtelemetry.LevelNormal,
Address: ":8081",
},
Expand All @@ -127,7 +128,7 @@ func TestDecodeConfig(t *testing.T) {
assert.Equal(t, 1, len(cfg.Service.Pipelines), "Incorrect pipelines count")

assert.Equal(t,
&config.Pipeline{
&serviceconfig.Pipeline{
Receivers: []config.ComponentID{config.NewComponentID("examplereceiver")},
Processors: []config.ComponentID{config.NewComponentID("exampleprocessor")},
Exporters: []config.ComponentID{config.NewComponentID("exampleexporter")},
Expand Down Expand Up @@ -226,7 +227,7 @@ func TestLoadEmptyAllSections(t *testing.T) {
assert.NoError(t, err)
}

func loadConfigFile(t *testing.T, fileName string, factories component.Factories) (*config.Config, error) {
func loadConfigFile(t *testing.T, fileName string, factories component.Factories) (*serviceconfig.Config, error) {
v, err := configmapprovider.NewFile(fileName).Retrieve(context.Background(), nil)
require.NoError(t, err)
cm, err := v.Get(context.Background())
Expand All @@ -245,7 +246,7 @@ func TestDefaultLoggerConfig(t *testing.T) {

zapProdCfg := zap.NewProductionConfig()
assert.Equal(t,
config.ServiceTelemetryLogs{
serviceconfig.ServiceTelemetryLogs{
Level: zapProdCfg.Level.Level(),
Development: zapProdCfg.Development,
Encoding: "console",
Expand Down
3 changes: 2 additions & 1 deletion config/configunmarshaler/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ package configunmarshaler // import "go.opentelemetry.io/collector/config/config
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/serviceconfig"
)

// ConfigUnmarshaler is the interface that unmarshalls the collector configuration from the config.Map.
type ConfigUnmarshaler interface {
// Unmarshal the configuration from the given parser and factories.
Unmarshal(v *config.Map, factories component.Factories) (*config.Config, error)
Unmarshal(v *config.Map, factories component.Factories) (*serviceconfig.Config, error)
}
8 changes: 4 additions & 4 deletions service/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configmapprovider"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/internal/configprovider"
"go.opentelemetry.io/collector/service/serviceconfig"
)

// ConfigProvider provides the service configuration.
Expand All @@ -43,7 +43,7 @@ type ConfigProvider interface {
// Get returns the service configuration, or error otherwise.
//
// Should never be called concurrently with itself, Watch or Shutdown.
Get(ctx context.Context, factories component.Factories) (*config.Config, error)
Get(ctx context.Context, factories component.Factories) (*serviceconfig.Config, error)

// Watch blocks until any configuration change was detected or an unrecoverable error
// happened during monitoring the configuration changes.
Expand Down Expand Up @@ -92,7 +92,7 @@ func NewDefaultConfigProvider(configFileName string, properties []string) Config
}
}

func (cm *configProvider) Get(ctx context.Context, factories component.Factories) (*config.Config, error) {
func (cm *configProvider) Get(ctx context.Context, factories component.Factories) (*serviceconfig.Config, error) {
// First check if already an active watching, close that if any.
if err := cm.closeIfNeeded(ctx); err != nil {
return nil, fmt.Errorf("cannot close previous watch: %w", err)
Expand All @@ -106,7 +106,7 @@ func (cm *configProvider) Get(ctx context.Context, factories component.Factories
return nil, fmt.Errorf("cannot retrieve the configuration: %w", err)
}

var cfg *config.Config
var cfg *serviceconfig.Config
m, err := cm.ret.Get(ctx)
if err != nil {
return nil, fmt.Errorf("cannot get the configuration: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion service/config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.opentelemetry.io/collector/config/configmapprovider"
"go.opentelemetry.io/collector/config/configunmarshaler"
"go.opentelemetry.io/collector/config/experimental/configsource"
"go.opentelemetry.io/collector/service/serviceconfig"
)

type errConfigMapProvider struct {
Expand All @@ -52,7 +53,7 @@ type errConfigUnmarshaler struct {
err error
}

func (ecu *errConfigUnmarshaler) Unmarshal(*config.Map, component.Factories) (*config.Config, error) {
func (ecu *errConfigUnmarshaler) Unmarshal(*config.Map, component.Factories) (*serviceconfig.Config, error) {
return nil, ecu.err
}

Expand Down
5 changes: 3 additions & 2 deletions service/internal/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal/components"
"go.opentelemetry.io/collector/service/serviceconfig"
)

// builtExporter is an exporter that is built based on a config. It can have
Expand Down Expand Up @@ -133,7 +134,7 @@ type exportersRequiredDataTypes map[config.ComponentID]dataTypeRequirements
func BuildExporters(
settings component.TelemetrySettings,
buildInfo component.BuildInfo,
cfg *config.Config,
cfg *serviceconfig.Config,
factories map[config.Type]component.ExporterFactory,
) (Exporters, error) {
logger := settings.Logger.With(zap.String(components.ZapKindKey, components.ZapKindLogExporter))
Expand Down Expand Up @@ -172,7 +173,7 @@ func BuildExporters(
return exporters, nil
}

func calcExportersRequiredDataTypes(cfg *config.Config) exportersRequiredDataTypes {
func calcExportersRequiredDataTypes(cfg *serviceconfig.Config) exportersRequiredDataTypes {
// Go over all pipelines. The data type of the pipeline defines what data type
// each exporter is expected to receive. Collect all required types for each
// exporter.
Expand Down
Loading

0 comments on commit edb2a4f

Please sign in to comment.