Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat: make deep copy before notifying of config change #42992

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

orestisfl
Copy link
Contributor

@orestisfl orestisfl commented Mar 3, 2025

Proposed commit message

This prevents concurrent read & write map access.

Unrelated, but I've escalated one log line to Info to allow for easier verifying that ES store is being used from agent logs.

Fixes #42815

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

How to test this PR locally

This is a blind spot: We have reproduced this on agentless deployments using abusech (see issue). However, I couldn't find a way to reproduce in an integration or unit test.

The closest is this hacky test:

diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go
index ebfecf191c..8efea8eadd 100644
--- a/libbeat/cmd/instance/beat_test.go
+++ b/libbeat/cmd/instance/beat_test.go
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//go:build !integration
-
 package instance
 
 import (
 	"bytes"
+	es "github.com/elastic/beats/v7/libbeat/statestore/backend/es"
 	"io/ioutil"
 	"os"
+	"sync"
 	"testing"
 
 	"github.com/elastic/beats/v7/libbeat/cfgfile"
@@ -262,6 +262,72 @@ elasticsearch:
 		require.Same(t, c, m.cfg.Config)
 		require.False(t, b.isConnectionToOlderVersionAllowed(), "allow_older_versions flag should now be set to false")
 	})
+
+	t.Run("test tmp", func(t *testing.T) {
+		b, err := NewBeat("testbeat", "testidx", "0.9", false, nil)
+		require.NoError(t, err)
+
+		cfg := `
+elasticsearch:
+  hosts: ["https://127.0.0.1:9200"]
+  username: "elastic"
+  allow_older_versions: false
+`
+		c, err := config.NewConfigWithYAML([]byte(cfg), cfg)
+		require.NoError(t, err)
+		outCfg, err := c.Child("elasticsearch", -1)
+		require.NoError(t, err)
+
+		var wg sync.WaitGroup
+		defer wg.Wait()
+
+		const size = 100
+		wg.Add(size)
+		notifiers := make([]*es.Notifier, size)
+		for i := 0; i < size; i++ {
+			n := es.NewNotifier()
+			notifiers[i] = n
+			if i%2 == 0 {
+				n.Subscribe(func(c *config.C) {
+					defer wg.Done()
+					t.Log(c.FlattenedKeys())
+				})
+			} else {
+				n.Subscribe(func(c *config.C) {
+					defer wg.Done()
+					c.Merge(map[string]string{"test": "test"})
+				})
+			}
+		}
+
+		update := &reload.ConfigWithMeta{Config: c}
+		b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
+			for i := 0; i < size; i++ {
+				outCfg := config.Namespace{}
+				if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
+					logp.Err("Failed to unpack the output config: %v", err)
+					return nil
+				}
+				c, err := config.NewConfigFrom(outCfg.Config())
+				require.NoError(t, err)
+
+				notifiers[i].Notify(c)
+			}
+			return nil
+		})
+		m := &outputReloaderMock{}
+		reloader := b.makeOutputReloader(m)
+
+		require.False(t, b.Config.Output.IsSet(), "the output should not be set yet")
+		require.True(t, b.isConnectionToOlderVersionAllowed(), "allow_older_versions flag should be true from 8.11")
+
+		err = reloader.Reload(update)
+		require.NoError(t, err)
+		require.True(t, b.Config.Output.IsSet(), "now the output should be set")
+		require.Equal(t, outCfg, b.Config.Output.Config())
+		require.Same(t, c, m.cfg.Config)
+		require.False(t, b.isConnectionToOlderVersionAllowed(), "allow_older_versions flag should now be set to false")
+	})
 }
 
 type outputReloaderMock struct {

however, that doesn't test any code to be commited

Related issues

Screenshots

Errors stop once the image with the fix is deployed
image

@orestisfl orestisfl added bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team backport-8.x Automated backport to the 8.x branch with mergify backport-8.18 Automated backport to the 8.18 branch backport-9.0 Automated backport to the 9.0 branch backport-active-all Automated backport with mergify to all the active branches labels Mar 3, 2025
@orestisfl orestisfl requested review from cmacknz and blakerouse March 3, 2025 18:01
@orestisfl orestisfl self-assigned this Mar 3, 2025
@orestisfl orestisfl requested a review from a team as a code owner March 3, 2025 18:01
@orestisfl orestisfl requested a review from AndersonQ March 3, 2025 18:01
@elasticmachine
Copy link
Collaborator

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Mar 3, 2025
Copy link
Member

@AndersonQ AndersonQ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. There is at least a test failing, but it seems some flakyness, you might retry it. If you want, you can also open a flaky test issue, that would be the best so we can track the flakyness

Btw, the test you added in the PR description, should it trigger the issue? I tried here and it passes with and without the change. So I don't really get the idea of that test.

@orestisfl
Copy link
Contributor Author

@AndersonQ yes, you are right, I accidentally pasted the "fixed" test. Try the one below a few times.

Just in case it's not clear, that test doesn't actually test the filebeat code, I copied the code from filebeat in a separate test to check the hypothesis that references to r.Config's values can be modified in different goroutines if the config isn't copied first.

I think it's worth it to discuss better testing approaches for this case but I'd prefer to merge this bugfix separately.

diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go
index ebfecf191c..f9c290693e 100644
--- a/libbeat/cmd/instance/beat_test.go
+++ b/libbeat/cmd/instance/beat_test.go
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//go:build !integration
-
 package instance
 
 import (
 	"bytes"
+	es "github.com/elastic/beats/v7/libbeat/statestore/backend/es"
 	"io/ioutil"
 	"os"
+	"sync"
 	"testing"
 
 	"github.com/elastic/beats/v7/libbeat/cfgfile"
@@ -262,6 +262,69 @@ elasticsearch:
 		require.Same(t, c, m.cfg.Config)
 		require.False(t, b.isConnectionToOlderVersionAllowed(), "allow_older_versions flag should now be set to false")
 	})
+
+	t.Run("test tmp", func(t *testing.T) {
+		b, err := NewBeat("testbeat", "testidx", "0.9", false, nil)
+		require.NoError(t, err)
+
+		cfg := `
+elasticsearch:
+  hosts: ["https://127.0.0.1:9200"]
+  username: "elastic"
+  allow_older_versions: false
+`
+		c, err := config.NewConfigWithYAML([]byte(cfg), cfg)
+		require.NoError(t, err)
+		outCfg, err := c.Child("elasticsearch", -1)
+		require.NoError(t, err)
+
+		var wg sync.WaitGroup
+		defer wg.Wait()
+
+		const size = 100
+		wg.Add(size)
+		notifiers := make([]*es.Notifier, size)
+		for i := 0; i < size; i++ {
+			n := es.NewNotifier()
+			notifiers[i] = n
+			if i%2 == 0 {
+				n.Subscribe(func(c *config.C) {
+					defer wg.Done()
+					t.Log(c.FlattenedKeys())
+				})
+			} else {
+				n.Subscribe(func(c *config.C) {
+					defer wg.Done()
+					c.Merge(map[string]string{"test": "test"})
+				})
+			}
+		}
+
+		update := &reload.ConfigWithMeta{Config: c}
+		b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
+			for i := 0; i < size; i++ {
+				outCfg := config.Namespace{}
+				if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
+					logp.Err("Failed to unpack the output config: %v", err)
+					return nil
+				}
+				notifiers[i].Notify(outCfg.Config())
+			}
+			return nil
+		})
+		m := &outputReloaderMock{}
+		reloader := b.makeOutputReloader(m)
+
+		require.False(t, b.Config.Output.IsSet(), "the output should not be set yet")
+		require.True(t, b.isConnectionToOlderVersionAllowed(), "allow_older_versions flag should be true from 8.11")
+
+		err = reloader.Reload(update)
+		require.NoError(t, err)
+		require.True(t, b.Config.Output.IsSet(), "now the output should be set")
+		require.Equal(t, outCfg, b.Config.Output.Config())
+		require.Same(t, c, m.cfg.Config)
+		require.False(t, b.isConnectionToOlderVersionAllowed(), "allow_older_versions flag should now be set to false")
+	})
 }
 
 type outputReloaderMock struct {

@orestisfl
Copy link
Contributor Author

LGTM. There is at least a test failing, but it seems some flakyness, you might retry it. If you want, you can also open a flaky test issue, that would be the best so we can track the flakyness

#42995

@orestisfl orestisfl enabled auto-merge (squash) March 4, 2025 11:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-8.x Automated backport to the 8.x branch with mergify backport-8.18 Automated backport to the 8.18 branch backport-9.0 Automated backport to the 9.0 branch backport-active-all Automated backport with mergify to all the active branches bug Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Filebeat] Initial errors/failures observed when testing AbuseCH integration in Agentless
3 participants