Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat] Elasticsearch state storage for httpjson and cel inputs #41446

Merged
merged 77 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
55c72d3
[filebeat] Elasticsearch state storage for httpjson input
aleksmaus Oct 24, 2024
1bf288d
Fixup tests
aleksmaus Oct 24, 2024
e003053
Linter
aleksmaus Oct 25, 2024
dfce978
Enabled elastisearch storage support for cel input and some cleanup
aleksmaus Oct 28, 2024
e2e25fa
Remove the "hack" with .Each implementation
aleksmaus Oct 30, 2024
953355b
Merge branch 'main' into poc/es_state_store
aleksmaus Oct 31, 2024
c1fc2a8
Adjust for the latest main es client signature change
aleksmaus Oct 31, 2024
c9b0256
Make check happy
aleksmaus Oct 31, 2024
21d451d
Fixed missing interface method on test mock store
aleksmaus Oct 31, 2024
ffb9364
Add error check in ES store Each
aleksmaus Oct 31, 2024
10d212f
Parameterize the supported input types through environment variables
aleksmaus Nov 5, 2024
24000d7
Delete the dev tests file
aleksmaus Nov 5, 2024
a0d019e
Address code review comments
aleksmaus Nov 7, 2024
34003d4
Add refresh=wait_for for data consistency
aleksmaus Nov 8, 2024
36e9983
Revert "Add refresh=wait_for for data consistency"
aleksmaus Nov 18, 2024
aee4112
Cleanup
aleksmaus Nov 18, 2024
76b11a4
Added updated_at field
aleksmaus Nov 18, 2024
1fe1c0e
Merge branch 'main' into poc/es_state_store
aleksmaus Nov 19, 2024
3c5bab1
Merge branch 'main' into poc/es_state_store
aleksmaus Nov 25, 2024
4ac1bd0
Eliminate AGENTLESS_ELASTICSEARCH_STATE_STORE_ENABLED setting. The El…
aleksmaus Nov 25, 2024
535560e
Merge branch 'main' into poc/es_state_store
aleksmaus Dec 2, 2024
4940097
Merge branch 'main' into poc/es_state_store
aleksmaus Dec 6, 2024
e73a1b7
Added logging per code review request
aleksmaus Dec 9, 2024
6c39fd0
Some cleanup and notifier utz
aleksmaus Dec 10, 2024
716758f
Add some integration testing coverage
aleksmaus Dec 10, 2024
5ec7ed4
Merge branch 'main' into poc/es_state_store
aleksmaus Dec 11, 2024
904d8e8
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 7, 2025
02d684a
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 7, 2025
5a9cdb5
Fix the Filebeat test harness for agentbeat tests
aleksmaus Jan 7, 2025
fd4cd00
Fixed the imports formatting for the notifier_test
aleksmaus Jan 7, 2025
8dd3938
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 8, 2025
c52bd65
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 9, 2025
8779697
Merge branch 'main' into poc/es_state_store
aleksmaus Jan 13, 2025
c195cae
Remove loading API key through envvar workaround
orestisfl Jan 17, 2025
dfafeff
remove os import
orestisfl Jan 17, 2025
e192747
nitpick
orestisfl Jan 17, 2025
71fd5dc
Use simple Mutex
orestisfl Jan 21, 2025
d4bc727
Notifier: add docs
orestisfl Jan 21, 2025
5569e7f
integration test: check elasticsearch is used
orestisfl Jan 22, 2025
a6fbdb3
Merge branch 'main' into poc/es_state_store
orestisfl Jan 22, 2025
aa96d56
revert loading env in NewFilebeat()
orestisfl Jan 22, 2025
71901b0
Remove Env
orestisfl Jan 22, 2025
970b878
typo
orestisfl Jan 22, 2025
28a8332
HandlerFunc: check error
orestisfl Jan 22, 2025
426c681
Fix log-check
orestisfl Jan 22, 2025
b15b0bb
Re-use existing debug log
orestisfl Jan 22, 2025
3e2aed3
re-use function
orestisfl Jan 22, 2025
89c6e9c
checkFilebeatLogs helper
orestisfl Jan 22, 2025
fa8742e
Make test check store initialization
orestisfl Jan 23, 2025
bdf0777
Debug-log inputID
orestisfl Jan 23, 2025
b71b11e
Introduce var inputUnit
orestisfl Jan 23, 2025
3d11005
UUID input id
orestisfl Jan 23, 2025
6b98fab
Check that zero and then one key is read
orestisfl Jan 23, 2025
af14d49
Check inputID is configured
orestisfl Jan 23, 2025
4fc4566
output unit func
orestisfl Jan 23, 2025
3e859b8
final atomic.Bool
orestisfl Jan 23, 2025
3041e60
Merge remote-tracking branch 'origin/main' into poc/es_state_store
orestisfl Jan 23, 2025
083fd86
comment
orestisfl Jan 23, 2025
2e173a4
Change uuid library
orestisfl Jan 23, 2025
c9ebe0f
Merge branch 'main' into poc/es_state_store
orestisfl Jan 24, 2025
c69aefe
check number of http calls
orestisfl Jan 23, 2025
f49cbfd
typo
orestisfl Jan 27, 2025
0cb23d7
integration test: Add more elaborate checks
orestisfl Jan 27, 2025
fe16a10
Fix delta
orestisfl Jan 27, 2025
89ac0bf
Merge branch 'main' into poc/es_state_store
orestisfl Jan 27, 2025
8770e00
Revert input-logfile changes
orestisfl Jan 27, 2025
cef8721
fix store_test.go
orestisfl Jan 27, 2025
a988659
reduce debug-log
orestisfl Jan 27, 2025
51cb319
avoid return error
orestisfl Jan 27, 2025
8d362da
cleanup
orestisfl Jan 27, 2025
59c4d52
notifier: Call listener with last config
orestisfl Jan 28, 2025
5b27a39
Simplify test
orestisfl Jan 28, 2025
040f3c5
Merge branch 'main' into poc/es_state_store
orestisfl Jan 28, 2025
c490984
nitpick
orestisfl Jan 28, 2025
1b7842e
sanity
orestisfl Jan 28, 2025
9e4c86e
Add unit tests for feature flag
orestisfl Jan 28, 2025
c1f5971
godoc
orestisfl Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
Expand Down Expand Up @@ -81,7 +82,9 @@ type Filebeat struct {
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
Access() (*statestore.Store, error)
// Access returns the storage registry depending on the type. This is needed for the Elasticsearch state store which
// is guarded by the feature.IsElasticsearchStateStoreEnabledForInput(typ) check.
Access(typ string) (*statestore.Store, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Godoc for this parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest something like

	// Access returns the storage registry depending on the type.
	// The value of typ is expected to have been obtained from
	// [cursor.InputManager.Type] and represents the input type.

CleanupInterval() time.Duration
}

Expand Down Expand Up @@ -300,13 +303,36 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}

stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
// Use context, like normal people do, hooking up to the beat.done channel
ctx, cn := context.WithCancel(context.Background())
go func() {
<-fb.done
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// If notifier is set, configure the listener for output configuration
// The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
// in order to allow it fully configure
if stateStore.notifier != nil {
b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
outCfg := conf.Namespace{}
if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
logp.Err("Failed to unpack the output config: %v", err)
return nil
}

stateStore.notifier.Notify(outCfg.Config())
return nil
})
}

err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream"))
if err != nil {
logp.Err("invalid filestream configuration: %+v", err)
Expand Down Expand Up @@ -357,6 +383,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()

// Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
Expand Down Expand Up @@ -541,7 +569,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}

store, err := stateStore.Access()
store, err := stateStore.Access("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
Expand Down
46 changes: 40 additions & 6 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,77 @@
package beater

import (
"context"
"time"

"github.com/elastic/beats/v7/filebeat/config"
"github.com/elastic/beats/v7/filebeat/features"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/backend"
"github.com/elastic/beats/v7/libbeat/statestore/backend/es"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

type filebeatStore struct {
registry *statestore.Registry
esRegistry *statestore.Registry
storeName string
cleanInterval time.Duration

// Notifies the Elasticsearch store about configuration change
// which is available only after the beat runtime manager connects to the Agent
// and receives the output configuration
notifier *es.Notifier
}

func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
memlog, err := memlog.New(logger, memlog.Settings{
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
var (
reg backend.Registry
err error

esreg *es.Registry
notifier *es.Notifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Optional) pointers to potentially nil objects are common places where bugs could be introduced. How about an interface with two implementations, one no-op and one the actual notifier.

)

if features.IsElasticsearchStateStoreEnabled() {
notifier = es.NewNotifier()
esreg = es.New(ctx, logger, notifier)
}

reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
return nil, err
}

return &filebeatStore{
registry: statestore.NewRegistry(memlog),
store := &filebeatStore{
registry: statestore.NewRegistry(reg),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
}, nil
notifier: notifier,
}

if esreg != nil {
store.esRegistry = statestore.NewRegistry(esreg)
}

return store, nil
}

func (s *filebeatStore) Close() {
s.registry.Close()
}

func (s *filebeatStore) Access() (*statestore.Store, error) {
// Access returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
return s.esRegistry.Get(s.storeName)
}
return s.registry.Get(s.storeName)
}

Expand Down
59 changes: 59 additions & 0 deletions filebeat/features/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import (
"os"
"strings"
)

// List of input types Elasticsearch state store is enabled for
var esTypesEnabled map[string]struct{}

var isESEnabled bool

func init() {
initFromEnv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES")
}

func initFromEnv(envName string) {
esTypesEnabled = make(map[string]struct{})

arr := strings.Split(os.Getenv(envName), ",")
for _, e := range arr {
k := strings.TrimSpace(e)
if k != "" {
esTypesEnabled[k] = struct{}{}
}
}
isESEnabled = len(esTypesEnabled) > 0
}

// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless
func IsElasticsearchStateStoreEnabled() bool {
return isESEnabled
}

// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled
func IsElasticsearchStateStoreEnabledForInput(inputType string) bool {
if IsElasticsearchStateStoreEnabled() {
_, ok := esTypesEnabled[inputType]
return ok
}
return false
}
86 changes: 86 additions & 0 deletions filebeat/features/features_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package features

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_initFromEnv(t *testing.T) {
const envName = "TEST_AGENTLESS_ENV"

t.Run("Without setting env", func(t *testing.T) {
// default init
assert.False(t, IsElasticsearchStateStoreEnabled())
assert.Empty(t, esTypesEnabled)
assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))

// init from env
initFromEnv(envName)
assert.False(t, IsElasticsearchStateStoreEnabled())
assert.Empty(t, esTypesEnabled)
assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))
})

tests := []struct {
name string
value string
wantEnabled bool
wantContains []string
}{
{
name: "Empty",
value: "",
wantEnabled: false,
wantContains: nil,
},
{
name: "Single value",
value: "xxx",
wantEnabled: true,
wantContains: []string{"xxx"},
},
{
name: "Multiple values",
value: "xxx,yyy",
wantEnabled: true,
wantContains: []string{"xxx", "yyy"},
},
{
name: "Multiple values with spaces",
value: ",,, , xxx , yyy, ,,,,",
wantEnabled: true,
wantContains: []string{"xxx", "yyy"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv(envName, tt.value)
initFromEnv(envName)

assert.Equal(t, tt.wantEnabled, IsElasticsearchStateStoreEnabled())
for _, contain := range tt.wantContains {
assert.Contains(t, esTypesEnabled, contain)
assert.True(t, IsElasticsearchStateStoreEnabledForInput(contain))
}
assert.Len(t, esTypesEnabled, len(tt.wantContains))
})
}
}
8 changes: 4 additions & 4 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string {
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
Expand Down Expand Up @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")
id := getIDFromPath(filepath, inputID, fi)

var entry registryEntry
Expand All @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect
}

func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
inputStore, _ := e.stateStore.Access()
inputStore, _ := e.stateStore.Access("")

var entry registryEntry
err := inputStore.Get(key, &entry)
Expand Down Expand Up @@ -553,7 +553,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *testStore) Close() {
s.registry.Close()
}

func (s *testStore) Access() (*statestore.Store, error) {
func (s *testStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filestream-benchmark")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ const globalInputID = ".global"

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ var closeStore = (*store).close
func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
ok := false

persistentStore, err := statestore.Access()
persistentStore, err := statestore.Access("")
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {

func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
func (ts testStateStore) Access() (*statestore.Store, error) {
func (ts testStateStore) Access(string) (*statestore.Store, error) {
if ts.Store == nil {
return nil, errors.New("no store configured")
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access() (*statestore.Store, error) {
func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) {
env := newInputTestingEnvironment(t)

if testCase.cursor != "" {
store, _ := env.stateStore.Access()
store, _ := env.stateStore.Access("")
tmp := map[string]any{}
if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil {
t.Fatal(err)
Expand Down
Loading
Loading