Skip to content

Commit

Permalink
Move config.Config and config.Service to serviceconfig
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 19, 2022
1 parent 70271f2 commit 0c20627
Show file tree
Hide file tree
Showing 24 changed files with 278 additions and 210 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
in 0.11.0 are no longer converted to the messages and fields that replaced the deprecated ones.
Received deprecated messages and fields will be now ignored. In OTLP/JSON in the
instrumentationLibraryLogs object the "logs" field is now named "logRecords" (#4724)
- Move `config.Config` and `config.Service` to `serviceconfig` (#4608)

### 💡 Enhancements 💡

Expand Down
145 changes: 14 additions & 131 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,141 +14,24 @@

package config // import "go.opentelemetry.io/collector/config"

import (
"errors"
"fmt"
)

var (
errMissingExporters = errors.New("no enabled exporters specified in config")
errMissingReceivers = errors.New("no enabled receivers specified in config")
errMissingServicePipelines = errors.New("service must have at least one pipeline")
)

// Config defines the configuration for the various elements of collector or agent.
type Config struct {
// Receivers is a map of ComponentID to Receivers.
Receivers map[ComponentID]Receiver

// Exporters is a map of ComponentID to Exporters.
Exporters map[ComponentID]Exporter

// Processors is a map of ComponentID to Processors.
Processors map[ComponentID]Processor

// Extensions is a map of ComponentID to extensions.
Extensions map[ComponentID]Extension

Service
}

var _ validatable = (*Config)(nil)

// Validate returns an error if the config is invalid.
//
// This function performs basic validation of configuration. There may be more subtle
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).
func (cfg *Config) Validate() error {
// Currently, there is no default receiver enabled.
// The configuration must specify at least one receiver to be valid.
if len(cfg.Receivers) == 0 {
return errMissingReceivers
}

// Validate the receiver configuration.
for recvID, recvCfg := range cfg.Receivers {
if err := recvCfg.Validate(); err != nil {
return fmt.Errorf("receiver %q has invalid configuration: %w", recvID, err)
}
}

// Currently, there is no default exporter enabled.
// The configuration must specify at least one exporter to be valid.
if len(cfg.Exporters) == 0 {
return errMissingExporters
}

// Validate the exporter configuration.
for expID, expCfg := range cfg.Exporters {
if err := expCfg.Validate(); err != nil {
return fmt.Errorf("exporter %q has invalid configuration: %w", expID, err)
}
}

// Validate the processor configuration.
for procID, procCfg := range cfg.Processors {
if err := procCfg.Validate(); err != nil {
return fmt.Errorf("processor %q has invalid configuration: %w", procID, err)
}
}

// Validate the extension configuration.
for extID, extCfg := range cfg.Extensions {
if err := extCfg.Validate(); err != nil {
return fmt.Errorf("extension %q has invalid configuration: %w", extID, err)
}
}

return cfg.validateService()
}

func (cfg *Config) validateService() error {
// Check that all enabled extensions in the service are configured.
for _, ref := range cfg.Service.Extensions {
// Check that the name referenced in the Service extensions exists in the top-level extensions.
if cfg.Extensions[ref] == nil {
return fmt.Errorf("service references extension %q which does not exist", ref)
}
}

// Must have at least one pipeline.
if len(cfg.Service.Pipelines) == 0 {
return errMissingServicePipelines
}

// Check that all pipelines have at least one receiver and one exporter, and they reference
// only configured components.
for pipelineID, pipeline := range cfg.Service.Pipelines {
// Validate pipeline has at least one receiver.
if len(pipeline.Receivers) == 0 {
return fmt.Errorf("pipeline %q must have at least one receiver", pipelineID)
}

// Validate pipeline receiver name references.
for _, ref := range pipeline.Receivers {
// Check that the name referenced in the pipeline's receivers exists in the top-level receivers.
if cfg.Receivers[ref] == nil {
return fmt.Errorf("pipeline %q references receiver %q which does not exist", pipelineID, ref)
}
}
// Type is the component type as it is used in the config.
type Type string

// Validate pipeline processor name references.
for _, ref := range pipeline.Processors {
// Check that the name referenced in the pipeline's processors exists in the top-level processors.
if cfg.Processors[ref] == nil {
return fmt.Errorf("pipeline %q references processor %q which does not exist", pipelineID, ref)
}
}
// DataType is a special Type that represents the data types supported by the collector. We currently support
// collecting metrics, traces and logs, this can expand in the future.
type DataType = Type

// Validate pipeline has at least one exporter.
if len(pipeline.Exporters) == 0 {
return fmt.Errorf("pipeline %q must have at least one exporter", pipelineID)
}
// Currently supported data types. Add new data types here when new types are supported in the future.
const (
// TracesDataType is the data type tag for traces.
TracesDataType DataType = "traces"

// Validate pipeline exporter name references.
for _, ref := range pipeline.Exporters {
// Check that the name referenced in the pipeline's Exporters exists in the top-level Exporters.
if cfg.Exporters[ref] == nil {
return fmt.Errorf("pipeline %q references exporter %q which does not exist", pipelineID, ref)
}
}
}
return nil
}
// MetricsDataType is the data type tag for metrics.
MetricsDataType DataType = "metrics"

// Type is the component type as it is used in the config.
type Type string
// LogsDataType is the data type tag for logs.
LogsDataType DataType = "logs"
)

// validatable defines the interface for the configuration validation.
type validatable interface {
Expand Down
17 changes: 9 additions & 8 deletions config/configunmarshaler/defaultunmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/serviceconfig"
)

// These are errors that can be returned by Unmarshal(). Note that error codes are not part
Expand Down Expand Up @@ -85,8 +86,8 @@ func NewDefault() ConfigUnmarshaler {

// Unmarshal the Config from a config.Map.
// After the config is unmarshalled, `Validate()` must be called to validate.
func (*defaultUnmarshaler) Unmarshal(v *config.Map, factories component.Factories) (*config.Config, error) {
var cfg config.Config
func (*defaultUnmarshaler) Unmarshal(v *config.Map, factories component.Factories) (*serviceconfig.Config, error) {
var cfg serviceconfig.Config

// Unmarshal top level sections and validate.
rawCfg := configSettings{}
Expand Down Expand Up @@ -164,12 +165,12 @@ func unmarshalExtensions(exts map[config.ComponentID]map[string]interface{}, fac
return extensions, nil
}

func unmarshalService(srvRaw map[string]interface{}) (config.Service, error) {
func unmarshalService(srvRaw map[string]interface{}) (serviceconfig.Service, error) {
// Setup default telemetry values as in service/logger.go.
// TODO: Add a component.ServiceFactory to allow this to be defined by the Service.
srv := config.Service{
Telemetry: config.ServiceTelemetry{
Logs: config.ServiceTelemetryLogs{
srv := serviceconfig.Service{
Telemetry: serviceconfig.ServiceTelemetry{
Logs: serviceconfig.ServiceTelemetryLogs{
Level: zapcore.InfoLevel,
Development: false,
Encoding: "console",
Expand All @@ -195,8 +196,8 @@ func unmarshalService(srvRaw map[string]interface{}) (config.Service, error) {
return srv, nil
}

func defaultServiceTelemetryMetricsSettings() config.ServiceTelemetryMetrics {
return config.ServiceTelemetryMetrics{
func defaultServiceTelemetryMetricsSettings() serviceconfig.ServiceTelemetryMetrics {
return serviceconfig.ServiceTelemetryMetrics{
Level: configtelemetry.LevelBasic, //nolint:staticcheck
Address: ":8888",
}
Expand Down
13 changes: 7 additions & 6 deletions config/configunmarshaler/defaultunmarshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/internal/testcomponents"
"go.opentelemetry.io/collector/service/serviceconfig"
)

func TestDecodeConfig(t *testing.T) {
Expand Down Expand Up @@ -100,8 +101,8 @@ func TestDecodeConfig(t *testing.T) {

// Verify Service Telemetry
assert.Equal(t,
config.ServiceTelemetry{
Logs: config.ServiceTelemetryLogs{
serviceconfig.ServiceTelemetry{
Logs: serviceconfig.ServiceTelemetryLogs{
Level: zapcore.DebugLevel,
Development: true,
Encoding: "console",
Expand All @@ -111,7 +112,7 @@ func TestDecodeConfig(t *testing.T) {
ErrorOutputPaths: []string{"stderr", "./error-output-logs"},
InitialFields: map[string]interface{}{"field_key": "filed_value"},
},
Metrics: config.ServiceTelemetryMetrics{
Metrics: serviceconfig.ServiceTelemetryMetrics{
Level: configtelemetry.LevelNormal,
Address: ":8081",
},
Expand All @@ -126,7 +127,7 @@ func TestDecodeConfig(t *testing.T) {
assert.Equal(t, 1, len(cfg.Service.Pipelines), "Incorrect pipelines count")

assert.Equal(t,
&config.Pipeline{
&serviceconfig.Pipeline{
Receivers: []config.ComponentID{config.NewComponentID("examplereceiver")},
Processors: []config.ComponentID{config.NewComponentID("exampleprocessor")},
Exporters: []config.ComponentID{config.NewComponentID("exampleexporter")},
Expand Down Expand Up @@ -225,7 +226,7 @@ func TestLoadEmptyAllSections(t *testing.T) {
assert.NoError(t, err)
}

func loadConfigFile(t *testing.T, fileName string, factories component.Factories) (*config.Config, error) {
func loadConfigFile(t *testing.T, fileName string, factories component.Factories) (*serviceconfig.Config, error) {
cm, err := configtest.LoadConfigMap(fileName)
require.NoError(t, err)

Expand All @@ -242,7 +243,7 @@ func TestDefaultLoggerConfig(t *testing.T) {

zapProdCfg := zap.NewProductionConfig()
assert.Equal(t,
config.ServiceTelemetryLogs{
serviceconfig.ServiceTelemetryLogs{
Level: zapProdCfg.Level.Level(),
Development: zapProdCfg.Development,
Encoding: "console",
Expand Down
3 changes: 2 additions & 1 deletion config/configunmarshaler/unmarshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ package configunmarshaler // import "go.opentelemetry.io/collector/config/config
import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/serviceconfig"
)

// ConfigUnmarshaler is the interface that unmarshalls the collector configuration from the config.Map.
type ConfigUnmarshaler interface {
// Unmarshal the configuration from the given parser and factories.
Unmarshal(v *config.Map, factories component.Factories) (*config.Config, error)
Unmarshal(v *config.Map, factories component.Factories) (*serviceconfig.Config, error)
}
Loading

0 comments on commit 0c20627

Please sign in to comment.