diff --git a/cmd/otelcol/main.go b/cmd/otelcol/main.go index a013f76263..c5b82b255f 100644 --- a/cmd/otelcol/main.go +++ b/cmd/otelcol/main.go @@ -72,6 +72,7 @@ func main() { log.Fatalf("failed to create discovery provider: %v", err) } + hooks := []configprovider.Hook{configServer, dryRun} envProvider := envprovider.New() fileProvider := fileprovider.New() serviceConfigProvider, err := otelcol.NewConfigProvider( @@ -82,16 +83,16 @@ func main() { discovery.ConfigDScheme(): configprovider.NewConfigSourceConfigMapProvider( discovery.ConfigDProvider(), zap.NewNop(), // The service logger is not available yet, setting it to Nop. - info, configServer, configsources.Get()..., + info, hooks, configsources.Get()..., ), discovery.DiscoveryModeScheme(): configprovider.NewConfigSourceConfigMapProvider( - discovery.DiscoveryModeProvider(), zap.NewNop(), info, configServer, configsources.Get()..., + discovery.DiscoveryModeProvider(), zap.NewNop(), info, hooks, configsources.Get()..., ), envProvider.Scheme(): configprovider.NewConfigSourceConfigMapProvider( - envProvider, zap.NewNop(), info, configServer, configsources.Get()..., + envProvider, zap.NewNop(), info, hooks, configsources.Get()..., ), fileProvider.Scheme(): configprovider.NewConfigSourceConfigMapProvider( - fileProvider, zap.NewNop(), info, configServer, configsources.Get()..., + fileProvider, zap.NewNop(), info, hooks, configsources.Get()..., ), }, Converters: confMapConverters, }, diff --git a/internal/configconverter/config_server.go b/internal/configconverter/config_server.go index b3868b0681..c2dd1a1fd4 100644 --- a/internal/configconverter/config_server.go +++ b/internal/configconverter/config_server.go @@ -31,6 +31,8 @@ import ( "github.com/spf13/cast" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" + + "github.com/signalfx/splunk-otel-collector/internal/configprovider" ) const ( @@ -49,6 +51,7 @@ const ( ) var _ confmap.Converter = (*ConfigServer)(nil) +var _ configprovider.Hook = (*ConfigServer)(nil) type ConfigServer struct { // Use get/set methods instead of direct usage @@ -95,14 +98,14 @@ func (cs *ConfigServer) Convert(_ context.Context, conf *confmap.Conf) error { return nil } -func (cs *ConfigServer) Register() { +func (cs *ConfigServer) OnNew() { cs.wg.Add(1) } -func (cs *ConfigServer) SetForScheme(scheme string, config map[string]any) { +func (cs *ConfigServer) OnRetrieve(scheme string, retrieved map[string]any) { cs.initialMutex.Lock() defer cs.initialMutex.Unlock() - cs.initial[scheme] = config + cs.initial[scheme] = retrieved } func (cs *ConfigServer) getInitial() map[string]any { @@ -123,8 +126,8 @@ func (cs *ConfigServer) getEffective() map[string]any { return cs.effective } -// start will create and start the singleton http server. It presumes cs.Register() has been -// called at least once and will tear down the moment the final cs.Unregister() call is made. +// start will create and start the singleton http server. It presumes cs.OnNew() has been +// called at least once and will tear down the moment the final cs.OnShutdown() call is made. func (cs *ConfigServer) start() { if enabled := os.Getenv(configServerEnabledEnvVar); enabled != "true" { // The config server needs to be explicitly enabled for the time being. @@ -174,7 +177,7 @@ func (cs *ConfigServer) start() { }) } -func (cs *ConfigServer) Unregister() { +func (cs *ConfigServer) OnShutdown() { cs.wg.Done() } diff --git a/internal/configconverter/config_server_test.go b/internal/configconverter/config_server_test.go index e69fb016c0..15f210b236 100644 --- a/internal/configconverter/config_server_test.go +++ b/internal/configconverter/config_server_test.go @@ -38,8 +38,8 @@ func TestConfigServer_RequireEnvVar(t *testing.T) { cs := NewConfigServer() require.NotNil(t, cs) - cs.Register() - t.Cleanup(cs.Unregister) + cs.OnNew() + t.Cleanup(cs.OnShutdown) require.NoError(t, cs.Convert(context.Background(), confmap.NewFromStringMap(initial))) client := &http.Client{} @@ -92,10 +92,10 @@ func TestConfigServer_EnvVar(t *testing.T) { cs := NewConfigServer() require.NotNil(t, cs) - cs.Register() + cs.OnNew() require.NoError(t, cs.Convert(context.Background(), confmap.NewFromStringMap(initial))) - defer cs.Unregister() + defer cs.OnShutdown() endpoint := tt.endpoint if endpoint == "" { @@ -147,10 +147,10 @@ func TestConfigServer_Serve(t *testing.T) { cs := NewConfigServer() require.NotNil(t, cs) - cs.Register() - t.Cleanup(cs.Unregister) + cs.OnNew() + t.Cleanup(cs.OnShutdown) - cs.SetForScheme("scheme", initial) + cs.OnRetrieve("scheme", initial) require.NoError(t, cs.Convert(context.Background(), confmap.NewFromStringMap(initial))) // Test for the pages to be actually valid YAML files. diff --git a/internal/configconverter/dry_run.go b/internal/configconverter/dry_run.go index e2de8eb683..bcf128c66f 100644 --- a/internal/configconverter/dry_run.go +++ b/internal/configconverter/dry_run.go @@ -18,32 +18,66 @@ import ( "context" "fmt" "os" + "sync" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" + + "github.com/signalfx/splunk-otel-collector/internal/configprovider" ) var _ confmap.Converter = (*DryRun)(nil) +var _ configprovider.Hook = (*DryRun)(nil) type DryRun struct { + *sync.Mutex + configs []map[string]any enabled bool } -func NewDryRun(enabled bool) DryRun { - return DryRun{enabled: enabled} +func NewDryRun(enabled bool) *DryRun { + return &DryRun{ + Mutex: &sync.Mutex{}, + enabled: enabled, + configs: []map[string]any{}, + } } -// Convert is intended to be called as the final service confmap.Converter so -// that it has access to the final config before exiting, if enabled. -func (dr DryRun) Convert(_ context.Context, conf *confmap.Conf) error { - if dr.enabled { - out, err := yaml.Marshal(conf.ToStringMap()) - if err != nil { - panic(fmt.Errorf("failed marshaling --dry-run config: %w", err)) +func (dr *DryRun) OnNew() {} + +func (dr *DryRun) OnRetrieve(_ string, retrieved map[string]any) { + if dr == nil || !dr.enabled { + return + } + dr.Lock() + defer dr.Unlock() + dr.configs = append(dr.configs, retrieved) +} + +func (dr *DryRun) OnShutdown() {} + +// Convert disregards the provided *confmap.Conf so that it will use +// unexpanded values (env vars, config source directives) as +// accrued by OnRetrieve() calls. +func (dr *DryRun) Convert(context.Context, *confmap.Conf) error { + if dr == nil || !dr.enabled { + return nil + } + cm := confmap.New() + dr.Lock() + for _, cfg := range dr.configs { + if err := cm.Merge(confmap.NewFromStringMap(cfg)); err != nil { + dr.Unlock() + return err } - fmt.Fprintf(os.Stdout, "%s", out) - os.Stdout.Sync() - os.Exit(0) } + dr.Unlock() // not deferred because we are exiting + out, err := yaml.Marshal(cm.ToStringMap()) + if err != nil { + panic(fmt.Errorf("failed marshaling --dry-run config: %w", err)) + } + fmt.Fprintf(os.Stdout, "%s", out) + os.Stdout.Sync() + os.Exit(0) return nil } diff --git a/internal/configconverter/dry_run_test.go b/internal/configconverter/dry_run_test.go index eafa5b9ebb..8733838e9e 100644 --- a/internal/configconverter/dry_run_test.go +++ b/internal/configconverter/dry_run_test.go @@ -27,13 +27,20 @@ import ( ) func TestDryRun(t *testing.T) { - config := confmap.NewFromStringMap(map[string]interface{}{"one": "one"}) + cfgOne := confmap.NewFromStringMap(map[string]any{"key.one": "value.one"}) dr := NewDryRun(false) + dr.OnNew() + defer func() { require.NotPanics(t, dr.OnShutdown) }() + + dr.OnRetrieve("some.scheme", cfgOne.ToStringMap()) require.NotPanics(t, func() { - dr.Convert(context.Background(), config) + dr.Convert(context.Background(), cfgOne) }) origStdOut := os.Stdout + t.Cleanup(func() { + os.Stdout = origStdOut + }) stdout, err := os.CreateTemp("", "stdout") require.NoError(t, err) require.NotNil(t, stdout) @@ -48,14 +55,17 @@ func TestDryRun(t *testing.T) { }()) dr = NewDryRun(true) + cfgTwo := confmap.NewFromStringMap(map[string]any{"key.two": "value.two"}) + dr.OnRetrieve("some.scheme", cfgOne.ToStringMap()) + dr.OnRetrieve("another.scheme", cfgTwo.ToStringMap()) require.Panics(t, func() { - dr.Convert(context.Background(), config) + dr.Convert(context.Background(), cfgTwo) }) - os.Stdout = origStdOut stdout.Seek(0, 0) out, err := io.ReadAll(stdout) require.NoError(t, err) actual := map[string]any{} yaml.Unmarshal(out, &actual) - require.Equal(t, config.ToStringMap(), actual) + expected := map[string]any{"key.one": "value.one", "key.two": "value.two"} + require.Equal(t, expected, actual) } diff --git a/internal/configprovider/config_source_provider.go b/internal/configprovider/config_source_provider.go index aef195af29..15bdba796a 100644 --- a/internal/configprovider/config_source_provider.go +++ b/internal/configprovider/config_source_provider.go @@ -22,17 +22,21 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.uber.org/zap" - - "github.com/signalfx/splunk-otel-collector/internal/configconverter" ) var ( _ confmap.Provider = (*configSourceConfigMapProvider)(nil) ) +type Hook interface { + OnNew() + OnRetrieve(scheme string, retrieved map[string]any) + OnShutdown() +} + type configSourceConfigMapProvider struct { logger *zap.Logger - configServer *configconverter.ConfigServer + hooks []Hook wrappedProvider confmap.Provider wrappedRetrieved *confmap.Retrieved buildInfo component.BuildInfo @@ -41,10 +45,12 @@ type configSourceConfigMapProvider struct { // NewConfigSourceConfigMapProvider creates a ParserProvider that uses config sources. func NewConfigSourceConfigMapProvider(wrappedProvider confmap.Provider, logger *zap.Logger, - buildInfo component.BuildInfo, configServer *configconverter.ConfigServer, factories ...Factory) confmap.Provider { - configServer.Register() + buildInfo component.BuildInfo, hooks []Hook, factories ...Factory) confmap.Provider { + for _, h := range hooks { + h.OnNew() + } return &configSourceConfigMapProvider{ - configServer: configServer, + hooks: hooks, wrappedProvider: wrappedProvider, logger: logger, factories: factories, @@ -89,7 +95,12 @@ func (c *configSourceConfigMapProvider) Retrieve(ctx context.Context, uri string if err != nil { return nil, err } - c.configServer.SetForScheme(c.Scheme(), wrappedMap.ToStringMap()) + + scheme, stringMap := c.Scheme(), wrappedMap.ToStringMap() + for _, h := range c.hooks { + h.OnRetrieve(scheme, stringMap) + } + retrieved, closeFunc, err := Resolve(ctx, wrappedMap, c.logger, c.buildInfo, factories, onChange) if err != nil { return nil, err @@ -103,7 +114,9 @@ func (c *configSourceConfigMapProvider) Scheme() string { } func (c *configSourceConfigMapProvider) Shutdown(ctx context.Context) error { - c.configServer.Unregister() + for _, h := range c.hooks { + h.OnShutdown() + } return c.wrappedProvider.Shutdown(ctx) } diff --git a/internal/configprovider/config_source_provider_test.go b/internal/configprovider/config_source_provider_test.go index 2ee7f9f462..1de7e0e1a8 100644 --- a/internal/configprovider/config_source_provider_test.go +++ b/internal/configprovider/config_source_provider_test.go @@ -22,13 +22,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.uber.org/zap" - - "github.com/signalfx/splunk-otel-collector/internal/configconverter" ) func TestConfigSourceConfigMapProvider(t *testing.T) { @@ -91,19 +90,36 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { } } + hookOne := &mockHook{} + hookTwo := &mockHook{} + hooks := []*mockHook{hookOne, hookTwo} + for _, h := range hooks { + h.On("OnNew") + h.On("OnRetrieve", mock.AnythingOfType("string"), mock.Anything) + h.On("OnShutdown") + } + pp := NewConfigSourceConfigMapProvider( &mockParserProvider{}, zap.NewNop(), component.NewDefaultBuildInfo(), - configconverter.NewConfigServer(), + []Hook{hookOne, hookTwo}, factories..., ) require.NotNil(t, pp) + for _, h := range hooks { + h.AssertCalled(t, "OnNew") + h.AssertNotCalled(t, "OnRetrieve") + h.AssertNotCalled(t, "OnShutdown") + } + + var expectedScheme string // Do not use the config.Default() to simplify the test setup. cspp := pp.(*configSourceConfigMapProvider) if tt.parserProvider != nil { cspp.wrappedProvider = tt.parserProvider + expectedScheme = tt.parserProvider.Scheme() } // Need to run Retrieve method no matter what, so we can't just iterate passed in config locations @@ -116,6 +132,7 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { configLocation = "" } r, err := pp.Retrieve(context.Background(), configLocation, nil) + if tt.wantErr == "" { require.NoError(t, err) require.NotNil(t, r) @@ -126,13 +143,26 @@ func TestConfigSourceConfigMapProvider(t *testing.T) { } else { assert.ErrorContains(t, err, tt.wantErr) assert.Nil(t, r) - return + break } i++ ok = i < len(tt.configLocation) } + for _, h := range hooks { + if tt.wantErr != "" { + h.AssertNotCalled(t, "OnRetrieve") + } else { + h.AssertCalled(t, "OnRetrieve", expectedScheme, mock.Anything) + } + h.AssertNotCalled(t, "OnShutdown") + } + assert.NoError(t, cspp.Shutdown(context.Background())) + + for _, h := range hooks { + h.AssertCalled(t, "OnShutdown") + } }) } } @@ -157,3 +187,21 @@ func (mpp *mockParserProvider) Shutdown(context.Context) error { func (mpp *mockParserProvider) Scheme() string { return "" } + +type mockHook struct { + mock.Mock +} + +var _ Hook = (*mockHook)(nil) + +func (m *mockHook) OnNew() { + m.Called() +} + +func (m *mockHook) OnRetrieve(scheme string, _ map[string]any) { + m.Called(scheme, mock.Anything) +} + +func (m *mockHook) OnShutdown() { + m.Called() +} diff --git a/pkg/extension/smartagentextension/config_windows_test.go b/pkg/extension/smartagentextension/config_windows_test.go index 9fed1a3bf9..630608f5e0 100644 --- a/pkg/extension/smartagentextension/config_windows_test.go +++ b/pkg/extension/smartagentextension/config_windows_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/extension" ) diff --git a/pkg/receiver/smartagent/config_windows_test.go b/pkg/receiver/smartagent/config_windows_test.go index fe6eb3fa28..6e1930ee86 100644 --- a/pkg/receiver/smartagent/config_windows_test.go +++ b/pkg/receiver/smartagent/config_windows_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap/confmaptest" ) func TestLoadUnsupportedCollectdMonitorOnWindows(t *testing.T) { diff --git a/tests/general/dry_run_test.go b/tests/general/dry_run_test.go new file mode 100644 index 0000000000..9ffec750ac --- /dev/null +++ b/tests/general/dry_run_test.go @@ -0,0 +1,96 @@ +// Copyright Splunk, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build integration + +package tests + +import ( + "context" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/signalfx/splunk-otel-collector/tests/testutils" +) + +func TestDryRunDoesntExpandEnvVars(t *testing.T) { + tc := testutils.NewTestcase(t) + defer tc.PrintLogsOnFailure() + defer tc.ShutdownOTLPReceiverSink() + + config := `config_sources: + env: + defaults: + COLLECTION_INTERVAL: 1s + OTLP_EXPORTER: otlp +exporters: + otlp: + endpoint: ${OTLP_ENDPOINT} + insecure: true +receivers: + hostmetrics: + collection_interval: ${env:COLLECTION_INTERVAL} + scrapers: + cpu: null +service: + pipelines: + metrics: + exporters: + - ${env:OTLP_EXPORTER} + receivers: + - ${HOST_METRICS_RECEIVER}` + + c, shutdown := tc.SplunkOtelCollectorContainer( + "", func(collector testutils.Collector) testutils.Collector { + // deferring running service for exec + c := collector.WithEnv( + map[string]string{ + "HOST_METRICS_RECEIVER": "hostmetrics", + "SPLUNK_CONFIG": "", + "SPLUNK_CONFIG_YAML": config, + }, + ).WithArgs("-c", "trap exit SIGTERM ; echo ok ; while true; do : ; done") + cc := c.(*testutils.CollectorContainer) + cc.Container = cc.Container.WithEntrypoint("bash").WillWaitForLogs("ok") + return cc + }, + ) + + defer shutdown() + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + sc, reader, err := c.Container.Exec(ctx, []string{ + "bash", "-c", "/otelcol --dry-run 2>/dev/null", + }) + assert.NoError(t, err) + require.NotNil(t, reader) + dryRun, err := io.ReadAll(reader) + require.NoError(t, err) + require.True(t, len(dryRun) >= 8) + require.Equal(t, config, string(dryRun[8:len(dryRun)-1])) // strip leading control character + require.Zero(t, sc) + + // confirm successful service functionality + sc, _, err = c.Container.Exec(ctx, []string{"bash", "-c", "/otelcol &"}) + assert.NoError(t, err) + require.Zero(t, sc) + + expectedResourceMetrics := tc.ResourceMetrics("cpu.yaml") + require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedResourceMetrics, 30*time.Second)) +} diff --git a/tests/testutils/container.go b/tests/testutils/container.go index 18dec5dd62..34f5a2105f 100644 --- a/tests/testutils/container.go +++ b/tests/testutils/container.go @@ -45,6 +45,7 @@ type Container struct { Image string ContainerName string ContainerNetworkMode string + Entrypoint []string Cmd []string ContainerNetworks []string ExposedPorts []string @@ -84,6 +85,11 @@ func (container Container) WithContextArchive(contextArchive io.Reader) Containe return container } +func (container Container) WithEntrypoint(entrypoint ...string) Container { + container.Entrypoint = entrypoint + return container +} + func (container Container) WithCmd(cmd ...string) Container { container.Cmd = cmd return container @@ -211,6 +217,7 @@ func (container Container) Build() *Container { Image: container.Image, FromDockerfile: container.Dockerfile, Cmd: container.Cmd, + Entrypoint: container.Entrypoint, Env: container.Env, ExposedPorts: container.ExposedPorts, Name: container.ContainerName, diff --git a/tests/testutils/container_test.go b/tests/testutils/container_test.go index 0ed1391b57..5ab3c152c2 100644 --- a/tests/testutils/container_test.go +++ b/tests/testutils/container_test.go @@ -65,6 +65,11 @@ func TestDockerBuilderMethods(t *testing.T) { assert.NotSame(t, builder, withContextArchive) assert.Nil(t, builder.Dockerfile.ContextArchive) + withEntrypoint := builder.WithEntrypoint("bin", "arg") + assert.Equal(t, []string{"bin", "arg"}, withEntrypoint.Entrypoint) + assert.NotSame(t, builder, withEntrypoint) + assert.Empty(t, builder.Entrypoint) + withCmd := builder.WithCmd("bash", "-c", "'sleep inf'") assert.Equal(t, []string{"bash", "-c", "'sleep inf'"}, withCmd.Cmd) assert.NotSame(t, builder, withCmd) @@ -384,8 +389,8 @@ func (lc *logConsumer) Accept(l testcontainers.Log) { var _ testcontainers.LogConsumer = (*logConsumer)(nil) func TestTestcontainersContainerMethods(t *testing.T) { - alpine := NewContainer().WithImage("alpine").WithCmd( - "sh", "-c", "echo rdy > /tmp/something && tail -f /tmp/something", + alpine := NewContainer().WithImage("alpine").WithEntrypoint("sh", "-c").WithCmd( + "echo rdy > /tmp/something && tail -f /tmp/something", ).WithExposedPorts("12345:12345").WithName("my-alpine").WithNetworks( "bridge", "network_a", "network_b", ).WillWaitForLogs("rdy").Build()