Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge hardcoded/default configuration with OTEL config file #2211

Merged
merged 13 commits into from
May 4, 2020
2 changes: 1 addition & 1 deletion cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Config(storageType string, zipkinHostPort string, factories config.Factorie
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: map[string]*configmodels.Pipeline{
Pipelines: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: recTypes,
Expand Down
15 changes: 9 additions & 6 deletions cmd/opentelemetry-collector/app/defaults/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package defaults

import (
"fmt"
"sort"
"testing"

Expand All @@ -38,14 +39,14 @@ func TestDefaultConfig(t *testing.T) {
storageType string
zipkinHostPort string
exporterTypes []string
pipeline map[string]*configmodels.Pipeline
pipeline configmodels.Pipelines
err string
}{
{
storageType: "elasticsearch",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{elasticsearch.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Expand All @@ -57,7 +58,7 @@ func TestDefaultConfig(t *testing.T) {
storageType: "cassandra",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{cassandra.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Expand All @@ -69,7 +70,7 @@ func TestDefaultConfig(t *testing.T) {
storageType: "kafka",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{kafka.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Expand All @@ -81,7 +82,7 @@ func TestDefaultConfig(t *testing.T) {
storageType: "cassandra,elasticsearch",
zipkinHostPort: disabledHostPort,
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Expand All @@ -93,7 +94,7 @@ func TestDefaultConfig(t *testing.T) {
storageType: "cassandra",
zipkinHostPort: ":9411",
exporterTypes: []string{cassandra.TypeStr},
pipeline: map[string]*configmodels.Pipeline{
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Expand Down Expand Up @@ -131,6 +132,8 @@ func TestDefaultConfig(t *testing.T) {
}
sort.Strings(types)
assert.Equal(t, test.exporterTypes, types)
fmt.Printf("expected %v\n", test.pipeline)
fmt.Printf("actual %v\n", cfg.Service.Pipelines)
assert.EqualValues(t, test.pipeline, cfg.Service.Pipelines)
})
}
Expand Down
66 changes: 66 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package defaults

import (
"reflect"

"github.com/imdario/mergo"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
)

// MergeConfigs merges two configs.
// The src is merged into dst.
func MergeConfigs(dst, src *configmodels.Config) error {
if src == nil {
return nil
}
err := mergo.Merge(dst, src,
mergo.WithOverride,
mergo.WithAppendSlice,
mergo.WithTransformers(transformerSliceUniqueValues{}))
if err != nil {
return err
}
return nil
}

// transformerSliceUniqueValues merges unique values of two string slices.
type transformerSliceUniqueValues struct {
}

func (st transformerSliceUniqueValues) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
if typ == reflect.TypeOf([]string{}) {
return func(dst, src reflect.Value) error {
dst.Interface()
if dst.CanSet() {
dstVal := dst.Interface().([]string)
m := map[string]bool{}
for _, v := range dstVal {
m[v] = true
}
srcVal := src.Interface().([]string)
for _, v := range srcVal {
if !m[v] {
srcVal = append(srcVal, v)
dst.Set(reflect.Append(dst, reflect.ValueOf(v)))
}
}
}
return nil
}
}
return nil
}
176 changes: 176 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright (c) 2020 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package defaults

import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor"
"github.com/open-telemetry/opentelemetry-collector/processor/batchprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
config2 "github.com/jaegertracing/jaeger/pkg/config"
)

func TestMergeConfigs_nil(t *testing.T) {
cfg := &configmodels.Config{
Receivers: configmodels.Receivers{
"jaeger": &jaegerreceiver.Config{
RemoteSampling: &jaegerreceiver.RemoteSamplingConfig{StrategyFile: "file.json"},
},
},
}
err := MergeConfigs(cfg, nil)
require.NoError(t, err)
assert.Equal(t, cfg, cfg)
}

func TestMergeConfigs(t *testing.T) {
cfg := &configmodels.Config{
Receivers: configmodels.Receivers{
"jaeger": &jaegerreceiver.Config{
Protocols: map[string]*receiver.SecureReceiverSettings{
"grpc": {ReceiverSettings: configmodels.ReceiverSettings{Endpoint: "def"}},
"thrift_compact": {ReceiverSettings: configmodels.ReceiverSettings{Endpoint: "def"}},
},
},
},
Processors: configmodels.Processors{
"batch": &batchprocessor.Config{
SendBatchSize: uint32(160),
},
},
Service: configmodels.Service{
Extensions: []string{"def", "def2"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
Receivers: []string{"jaeger"},
Processors: []string{"batch"},
},
},
},
}
overrideCfg := &configmodels.Config{
Receivers: configmodels.Receivers{
"jaeger": &jaegerreceiver.Config{
Protocols: map[string]*receiver.SecureReceiverSettings{
"grpc": {ReceiverSettings: configmodels.ReceiverSettings{Endpoint: "master_jaeger_url", Disabled: true}},
},
},
"zipkin": &zipkinreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "master_zipkin_url",
},
},
},
Processors: configmodels.Processors{
"attributes": &attributesprocessor.Config{
Actions: []attributesprocessor.ActionKeyValue{{Key: "foo"}},
},
},
Service: configmodels.Service{
Extensions: []string{"def", "master1", "master2"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
Receivers: []string{"jaeger", "zipkin"},
Processors: []string{"attributes"},
},
"traces/2": &configmodels.Pipeline{
Processors: []string{"example"},
},
},
},
}
expected := &configmodels.Config{
Receivers: configmodels.Receivers{
"jaeger": &jaegerreceiver.Config{
Protocols: map[string]*receiver.SecureReceiverSettings{
"grpc": {ReceiverSettings: configmodels.ReceiverSettings{Endpoint: "master_jaeger_url", Disabled: true}},
"thrift_compact": {ReceiverSettings: configmodels.ReceiverSettings{Endpoint: "def"}},
},
},
"zipkin": &zipkinreceiver.Config{
ReceiverSettings: configmodels.ReceiverSettings{
Endpoint: "master_zipkin_url",
},
},
},
Processors: configmodels.Processors{
"batch": &batchprocessor.Config{
SendBatchSize: uint32(160),
},
"attributes": &attributesprocessor.Config{
Actions: []attributesprocessor.ActionKeyValue{{Key: "foo"}},
},
},
Service: configmodels.Service{
Extensions: []string{"def", "def2", "master1", "master2"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
Receivers: []string{"jaeger", "zipkin"},
Processors: []string{"batch", "attributes"},
},
"traces/2": &configmodels.Pipeline{
Processors: []string{"example"},
},
},
},
}
err := MergeConfigs(cfg, overrideCfg)
require.NoError(t, err)
assert.Equal(t, expected, cfg)
}

func TestMergeConfigFiles(t *testing.T) {
testFiles := []string{"emptyoverride", "addprocessor", "multiplecomponents"}
v, _ := config2.Viperize(elasticsearch.DefaultOptions().AddFlags)
cmpts := Components(v)
for _, f := range testFiles {
t.Run(f, func(t *testing.T) {
cfg, err := loadConfig(cmpts, fmt.Sprintf("testdata/%s.yaml", f))
require.NoError(t, err)
override, err := loadConfig(cmpts, fmt.Sprintf("testdata/%s-override.yaml", f))
require.NoError(t, err)
merged, err := loadConfig(cmpts, fmt.Sprintf("testdata/%s-merged.yaml", f))
require.NoError(t, err)
err = MergeConfigs(cfg, override)
require.NoError(t, err)
assert.Equal(t, merged, cfg)
})
}
}

func loadConfig(factories config.Factories, file string) (*configmodels.Config, error) {
// config.Load fails to load an empty config
if file == "testdata/emptyoverride-override.yaml" {
return &configmodels.Config{}, nil
}
v := viper.New()
v.SetConfigFile(file)
err := v.ReadInConfig()
if err != nil {
return nil, fmt.Errorf("error loading config file %q: %v", file, err)
}
return config.Load(v, factories)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
receivers:
jaeger:
protocols:
grpc:
thrift_compact:
thrift_binary:

processors:
queued_retry: {}
batch:
timeout: 5s

exporters:
jaeger_elasticsearch:

service:
pipelines:
traces:
receivers: [jaeger]
processors: [queued_retry, batch]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may highlight a potential issue in some situations - what if a user wants the processors to be performed in a particular order?
In this specific case, wouldn't the batch normally precede the queued_retry? So the spans are batched, and then queued/retried based on those new batches?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good comment. We should allow overriding the order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In https://github.com/open-telemetry/opentelemetry-collector/pull/909` disabled flag has been removed from all components. Hence there is no way how a user could disable hardcoded/default component.

In this PR I wanted to support incremental addition of components. For instance if a user wants to add a processor it would just add it to services.pipelines.traces.processors without repeating the default/hardcoded processors. Since there is no way to disable a component and order of the components might be important we should require defining an exact order of the components in the pipeline.

Example: add attribute processor

processors:
  attributes:
    key: foo

service:
  pipelines:
    traces:
      processors: ["batch", "attributes"]

The batch has to be repeated otherwise it will not be included in the pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be considered/implemented in a separate PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, as this will have to change anyways.

exporters: [jaeger_elasticsearch]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
receivers:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't the receivers and exporters be removed now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not now. There is a comment in this file explaining why.

See also open-telemetry/opentelemetry-collector#888

jaeger:
protocols:
grpc:
# TODO OTEL config.Load() fails if there are no receiver/exporters
exporters:
jaeger_elasticsearch:
# TODO These two properties have to be overridden here if they were overridden in addprocessor.yaml
# If we do not override them here the default values from viper will override the values from "default/src" config.
# However in reality the default config is created from viper, hence when OTEL configuration is created it
# uses viper to create default values and then it sets the values from config.
# The viper is not used in tests hence we have to override it here.
#num_shards: 1
#num_replicas: 0

processors:
batch:
timeout: 5s

service:
pipelines:
traces:
processors: [batch]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
receivers:
jaeger:
protocols:
grpc:
thrift_compact:
thrift_binary:

processors:
queued_retry: {}

exporters:
jaeger_elasticsearch:

service:
pipelines:
traces:
receivers: [jaeger]
processors: [queued_retry]
exporters: [jaeger_elasticsearch]
Loading