From 9ece0af4503d2a364c405ec2dbf3ee87588cbf61 Mon Sep 17 00:00:00 2001 From: Silvia Mitter Date: Fri, 10 May 2019 14:42:58 +0200 Subject: [PATCH 1/3] Add minimal ES template functionality. (#12103) When loading a template without fields, create a minimal template only applying given configuration, without any default values for mappings and settings. This allows to create additional templates only defining specific values. --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/template/load.go | 12 + libbeat/template/load_integration_test.go | 397 +++++++++---------- libbeat/template/load_test.go | 71 ++-- libbeat/template/template.go | 19 + libbeat/template/testdata/default_fields.yml | 7 + libbeat/template/testdata/fields.json | 16 + 7 files changed, 273 insertions(+), 250 deletions(-) create mode 100644 libbeat/template/testdata/default_fields.yml create mode 100644 libbeat/template/testdata/fields.json diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 1162a5835753..d4bbd0d5eaea 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -38,3 +38,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Reduce idxmgmt.Supporter interface and rework export commands to reuse logic. {pull}11777[11777], {pull}12065[12065], {pull}12067[12067] - Update urllib3 version to 1.24.2 {pull}11930[11930] - Add libbeat/common/cleanup package. {pull}12134[12134] +- Only Load minimal template if no fields are provided. {pull}12103[12103] \ No newline at end of file diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 2d8fdbf6607e..33b9886dea85 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -175,6 +175,9 @@ func buildBody(tmpl *Template, config TemplateConfig, fields []byte) (common.Map if config.Fields != "" { return buildBodyFromFile(tmpl, config) } + if fields == nil { + return buildMinimalTemplate(tmpl) + } return buildBodyFromFields(tmpl, fields) } @@ -216,6 +219,15 @@ func buildBodyFromFields(tmpl *Template, fields []byte) (common.MapStr, error) { return body, nil } +func buildMinimalTemplate(tmpl *Template) (common.MapStr, error) { + logp.Debug("template", "Load minimal template") + body, err := tmpl.LoadMinimal() + if err != nil { + return nil, fmt.Errorf("error creating mimimal template: %v", err) + } + return body, nil +} + func esVersionParams(ver common.Version) map[string]string { if ver.Major == 6 && ver.Minor == 7 { return map[string]string{ diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index e33f8aae294a..ae393010f0db 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -22,245 +22,225 @@ package template import ( "encoding/json" "fmt" + "io/ioutil" + "math/rand" "path/filepath" "strconv" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/outputs/elasticsearch" "github.com/elastic/beats/libbeat/outputs/elasticsearch/estest" "github.com/elastic/beats/libbeat/version" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + type testTemplate struct { t *testing.T client ESClient common.MapStr } -var ( - beatInfo = beat.Info{ - Beat: "testbeat", - IndexPrefix: "testbeatidx", - Version: version.GetDefaultVersion(), - } - - templateName = "testbeatidx-" + version.GetDefaultVersion() -) +type testSetup struct { + t *testing.T + client ESClient + loader *ESLoader + config TemplateConfig +} -func defaultESLoader(t *testing.T) *ESLoader { +func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { + if cfg.Name == "" { + cfg.Name = fmt.Sprintf("load-test-%+v", rand.Int()) + } client := estest.GetTestingElasticsearch(t) if err := client.Connect(); err != nil { t.Fatal(err) } - - return NewESLoader(client) + s := testSetup{t: t, client: client, loader: NewESLoader(client), config: cfg} + client.Request("DELETE", "/_template/"+cfg.Name, "", nil, nil) + require.False(t, s.loader.templateExists(cfg.Name)) + return &s } - -func TestCheckTemplate(t *testing.T) { - loader := defaultESLoader(t) - - // Check for non existent template - assert.False(t, loader.templateExists("libbeat-notexists")) +func (ts *testSetup) loadFromFile(fileElems []string) error { + ts.config.Fields = path(ts.t, fileElems) + beatInfo := beat.Info{Version: version.GetDefaultVersion()} + return ts.loader.Load(ts.config, beatInfo, nil, false) } -func TestLoadTemplate(t *testing.T) { - // Setup ES - loader := defaultESLoader(t) - client := loader.client - - // Load template - absPath, err := filepath.Abs("../") - assert.NotNil(t, absPath) - assert.Nil(t, err) - - fieldsPath := absPath + "/fields.yml" - index := "testbeat" - - tmpl, err := New(version.GetDefaultVersion(), index, client.GetVersion(), TemplateConfig{}, false) - require.NoError(t, err) - content, err := tmpl.LoadFile(fieldsPath) - require.NoError(t, err) - - // Load template - err = loader.loadTemplate(tmpl.GetName(), content) - require.NoError(t, err) +func (ts *testSetup) load(fields []byte) error { + beatInfo := beat.Info{Version: version.GetDefaultVersion()} + return ts.loader.Load(ts.config, beatInfo, fields, false) +} - // Make sure template was loaded - assert.True(t, loader.templateExists(tmpl.GetName())) +func (ts *testSetup) mustLoad(fields []byte) { + require.NoError(ts.t, ts.load(fields)) + require.True(ts.t, ts.loader.templateExists(ts.config.Name)) +} - // Delete template again to clean up - client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) +func TestESLoader_Load(t *testing.T) { + t.Run("failure", func(t *testing.T) { + t.Run("loading disabled", func(t *testing.T) { + setup := newTestSetup(t, TemplateConfig{Enabled: false}) + + setup.load(nil) + assert.False(t, setup.loader.templateExists(setup.config.Name)) + }) + + t.Run("invalid version", func(t *testing.T) { + setup := newTestSetup(t, TemplateConfig{Enabled: true}) + + beatInfo := beat.Info{Version: "invalid"} + err := setup.loader.Load(setup.config, beatInfo, nil, false) + if assert.Error(t, err) { + assert.Contains(t, err.Error(), "version is not semver") + } + }) + }) + + t.Run("overwrite", func(t *testing.T) { + // Setup create template with source enabled + setup := newTestSetup(t, TemplateConfig{Enabled: true}) + setup.mustLoad(nil) + + // Add custom settings + setup.config.Settings = TemplateSettings{Source: map[string]interface{}{"enabled": false}} + + t.Run("disabled", func(t *testing.T) { + setup.load(nil) + tmpl := getTemplate(t, setup.client, setup.config.Name) + assert.Equal(t, true, tmpl.SourceEnabled()) + }) + + t.Run("enabled", func(t *testing.T) { + setup.config.Overwrite = true + setup.load(nil) + tmpl := getTemplate(t, setup.client, setup.config.Name) + assert.Equal(t, false, tmpl.SourceEnabled()) + }) + }) + + t.Run("json.name", func(t *testing.T) { + nameJSON := "bar" + + setup := newTestSetup(t, TemplateConfig{Enabled: true}) + setup.mustLoad(nil) + + // Load Template with same name, but different JSON.name and ensure it is used + setup.config.JSON = struct { + Enabled bool `config:"enabled"` + Path string `config:"path"` + Name string `config:"name"` + }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: nameJSON} + setup.load(nil) + assert.True(t, setup.loader.templateExists(nameJSON)) + }) + + t.Run("load template successful", func(t *testing.T) { + fields, err := ioutil.ReadFile(path(t, []string{"testdata", "default_fields.yml"})) + require.NoError(t, err) + for run, data := range map[string]struct { + cfg TemplateConfig + fields []byte + fieldsPath string + properties []string + }{ + "default config with fields": { + cfg: TemplateConfig{Enabled: true}, + fields: fields, + properties: []string{"foo", "bar"}, + }, + "minimal template": { + cfg: TemplateConfig{Enabled: true}, + fields: nil, + }, + "fields from file": { + cfg: TemplateConfig{Enabled: true, Fields: path(t, []string{"testdata", "fields.yml"})}, + fields: fields, + properties: []string{"object", "keyword", "alias", "migration_alias_false", "object_disabled"}, + }, + "fields from json": { + cfg: TemplateConfig{Enabled: true, Name: "json-template", JSON: struct { + Enabled bool `config:"enabled"` + Path string `config:"path"` + Name string `config:"name"` + }{Enabled: true, Path: path(t, []string{"testdata", "fields.json"}), Name: "json-template"}}, + fields: fields, + properties: []string{"host_name"}, + }, + } { + t.Run(run, func(t *testing.T) { + setup := newTestSetup(t, data.cfg) + setup.mustLoad(data.fields) + + // Fetch properties + tmpl := getTemplate(t, setup.client, setup.config.Name) + val, err := tmpl.GetValue("mappings.properties") + if data.properties == nil { + assert.Error(t, err) + } else { + require.NoError(t, err) + p, ok := val.(map[string]interface{}) + require.True(t, ok) + var properties []string + for k := range p { + properties = append(properties, k) + } + assert.ElementsMatch(t, properties, data.properties) + } + }) + } + }) +} - // Make sure it was removed - assert.False(t, loader.templateExists(tmpl.GetName())) +func TestTemplate_LoadFile(t *testing.T) { + setup := newTestSetup(t, TemplateConfig{Enabled: true}) + assert.NoError(t, setup.loadFromFile([]string{"..", "fields.yml"})) + assert.True(t, setup.loader.templateExists(setup.config.Name)) } func TestLoadInvalidTemplate(t *testing.T) { - // Invalid Template - template := map[string]interface{}{ - "json": "invalid", - } - - // Setup ES - loader := defaultESLoader(t) - - templateName := "invalidtemplate" + setup := newTestSetup(t, TemplateConfig{}) // Try to load invalid template - err := loader.loadTemplate(templateName, template) + template := map[string]interface{}{"json": "invalid"} + err := setup.loader.loadTemplate(setup.config.Name, template) assert.Error(t, err) - - // Make sure template was not loaded - assert.False(t, loader.templateExists(templateName)) + assert.False(t, setup.loader.templateExists(setup.config.Name)) } // Tests loading the templates for each beat -func TestLoadBeatsTemplate(t *testing.T) { +func TestLoadBeatsTemplate_fromFile(t *testing.T) { beats := []string{ "libbeat", } for _, beat := range beats { - // Load template - absPath, err := filepath.Abs("../../" + beat) - assert.NotNil(t, absPath) - assert.Nil(t, err) - - // Setup ES - loader := defaultESLoader(t) - client := loader.client - - fieldsPath := absPath + "/fields.yml" - index := beat - - tmpl, err := New(version.GetDefaultVersion(), index, client.GetVersion(), TemplateConfig{}, false) - assert.NoError(t, err) - content, err := tmpl.LoadFile(fieldsPath) - assert.NoError(t, err) - - // Load template - err = loader.loadTemplate(tmpl.GetName(), content) - assert.Nil(t, err) - - // Make sure template was loaded - assert.True(t, loader.templateExists(tmpl.GetName())) - - // Delete template again to clean up - client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) - - // Make sure it was removed - assert.False(t, loader.templateExists(tmpl.GetName())) + setup := newTestSetup(t, TemplateConfig{Name: beat, Enabled: true}) + assert.NoError(t, setup.loadFromFile([]string{"..", "..", beat, "fields.yml"})) + assert.True(t, setup.loader.templateExists(setup.config.Name)) } } func TestTemplateSettings(t *testing.T) { - // Setup ES - loader := defaultESLoader(t) - client := loader.client - - // Load template - absPath, err := filepath.Abs("../") - assert.NotNil(t, absPath) - assert.Nil(t, err) - - fieldsPath := absPath + "/fields.yml" - settings := TemplateSettings{ - Index: common.MapStr{ - "number_of_shards": 1, - }, - Source: common.MapStr{ - "enabled": false, - }, + Index: common.MapStr{"number_of_shards": 1}, + Source: common.MapStr{"enabled": false}, } - config := TemplateConfig{ - Settings: settings, - } - tmpl, err := New(version.GetDefaultVersion(), "testbeat", client.GetVersion(), config, false) - assert.NoError(t, err) - content, err := tmpl.LoadFile(fieldsPath) - assert.NoError(t, err) - - // Load template - err = loader.loadTemplate(tmpl.GetName(), content) - assert.Nil(t, err) + setup := newTestSetup(t, TemplateConfig{Settings: settings, Enabled: true}) + require.NoError(t, setup.loadFromFile([]string{"..", "fields.yml"})) // Check that it contains the mapping - templateJSON := getTemplate(t, client, tmpl.GetName()) + templateJSON := getTemplate(t, setup.client, setup.config.Name) assert.Equal(t, 1, templateJSON.NumberOfShards()) assert.Equal(t, false, templateJSON.SourceEnabled()) - - // Delete template again to clean up - client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) - - // Make sure it was removed - assert.False(t, loader.templateExists(tmpl.GetName())) -} - -func TestOverwrite(t *testing.T) { - // Setup ES - loader := defaultESLoader(t) - client := loader.client - - templateName := "testbeatidx-" + version.GetDefaultVersion() - - absPath, err := filepath.Abs("../") - assert.NotNil(t, absPath) - assert.Nil(t, err) - - // make sure no template is already there - client.Request("DELETE", "/_template/"+templateName, "", nil, nil) - - // Load template - config := TemplateConfig{ - Enabled: true, - Fields: absPath + "/fields.yml", - } - err = loader.Load(config, beatInfo, nil, false) - assert.NoError(t, err) - - // Load template again, this time with custom settings - config = TemplateConfig{ - Enabled: true, - Fields: absPath + "/fields.yml", - Settings: TemplateSettings{ - Source: map[string]interface{}{ - "enabled": false, - }, - }, - } - - err = loader.Load(config, beatInfo, nil, false) - assert.NoError(t, err) - - // Overwrite was not enabled, so the first version should still be there - templateJSON := getTemplate(t, client, templateName) - assert.Equal(t, true, templateJSON.SourceEnabled()) - - // Load template again, this time with custom settings AND overwrite: true - config = TemplateConfig{ - Enabled: true, - Overwrite: true, - Fields: absPath + "/fields.yml", - Settings: TemplateSettings{ - Source: map[string]interface{}{ - "enabled": false, - }, - }, - } - err = loader.Load(config, beatInfo, nil, false) - assert.NoError(t, err) - - // Overwrite was enabled, so the custom setting should be there - templateJSON = getTemplate(t, client, templateName) - assert.Equal(t, false, templateJSON.SourceEnabled()) - - // Delete template again to clean up - client.Request("DELETE", "/_template/"+templateName, "", nil, nil) } var dataTests = []struct { @@ -307,31 +287,12 @@ var dataTests = []struct { // Tests if data can be loaded into elasticsearch with right types func TestTemplateWithData(t *testing.T) { - fieldsPath, err := filepath.Abs("./testdata/fields.yml") - assert.NotNil(t, fieldsPath) - assert.Nil(t, err) - - // Setup ES - client := estest.GetTestingElasticsearch(t) - if err := client.Connect(); err != nil { - t.Fatal(err) - } - loader := NewESLoader(client) - - tmpl, err := New(version.GetDefaultVersion(), "testindex", client.GetVersion(), TemplateConfig{}, false) - assert.NoError(t, err) - content, err := tmpl.LoadFile(fieldsPath) - assert.NoError(t, err) - - // Load template - err = loader.loadTemplate(tmpl.GetName(), content) - assert.Nil(t, err) - - // Make sure template was loaded - assert.True(t, loader.templateExists(tmpl.GetName())) - + setup := newTestSetup(t, TemplateConfig{Enabled: true}) + require.NoError(t, setup.loadFromFile([]string{"testdata", "fields.yml"})) + require.True(t, setup.loader.templateExists(setup.config.Name)) + esClient := setup.client.(*elasticsearch.Client) for _, test := range dataTests { - _, _, err = client.Index(tmpl.GetName(), "_doc", "", nil, test.data) + _, _, err := esClient.Index(setup.config.Name, "_doc", "", nil, test.data) if test.error { assert.NotNil(t, err) @@ -339,22 +300,16 @@ func TestTemplateWithData(t *testing.T) { assert.Nil(t, err) } } - - // Delete template again to clean up - client.Request("DELETE", "/_template/"+tmpl.GetName(), "", nil, nil) - - // Make sure it was removed - assert.False(t, loader.templateExists(tmpl.GetName())) } func getTemplate(t *testing.T, client ESClient, templateName string) testTemplate { status, body, err := client.Request("GET", "/_template/"+templateName, "", nil, nil) - assert.NoError(t, err) - assert.Equal(t, status, 200) + require.NoError(t, err) + require.Equal(t, status, 200) var response common.MapStr err = json.Unmarshal(body, &response) - assert.NoError(t, err) + require.NoError(t, err) return testTemplate{ t: t, @@ -389,3 +344,9 @@ func (tt *testTemplate) NumberOfShards() int { require.NoError(tt.t, err) return i } + +func path(t *testing.T, fileElems []string) string { + fieldsPath, err := filepath.Abs(filepath.Join(fileElems...)) + require.NoError(t, err) + return fieldsPath +} diff --git a/libbeat/template/load_test.go b/libbeat/template/load_test.go index 07f0ca7ad5ba..ff0d6bfece45 100644 --- a/libbeat/template/load_test.go +++ b/libbeat/template/load_test.go @@ -23,7 +23,6 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/version" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,62 +32,70 @@ func TestFileLoader_Load(t *testing.T) { ver := "7.0.0" prefix := "mock" info := beat.Info{Version: ver, IndexPrefix: prefix} + tmplName := fmt.Sprintf("%s-%s", prefix, ver) for name, test := range map[string]struct { - cfg TemplateConfig - fields []byte - migration bool - - name string + settings TemplateSettings + body common.MapStr }{ - "default config": { - cfg: DefaultConfig(), - name: fmt.Sprintf("%s-%s", prefix, ver), - migration: false, + "load minimal config info": { + body: common.MapStr{ + "index_patterns": []string{"mock-7.0.0-*"}, + "order": 0, + "settings": common.MapStr{"index": nil}}, + }, + "load minimal config with index settings": { + settings: TemplateSettings{Index: common.MapStr{"code": "best_compression"}}, + body: common.MapStr{ + "index_patterns": []string{"mock-7.0.0-*"}, + "order": 0, + "settings": common.MapStr{"index": common.MapStr{"code": "best_compression"}}}, }, - "default config with migration": { - cfg: DefaultConfig(), - name: fmt.Sprintf("%s-%s", prefix, ver), - migration: true, + "load minimal config with source settings": { + settings: TemplateSettings{Source: common.MapStr{"enabled": false}}, + body: common.MapStr{ + "index_patterns": []string{"mock-7.0.0-*"}, + "order": 0, + "settings": common.MapStr{"index": nil}, + "mappings": common.MapStr{ + "_source": common.MapStr{"enabled": false}, + "_meta": common.MapStr{"beat": prefix, "version": ver}, + "date_detection": false, + "dynamic_templates": nil, + "properties": nil, + }}, }, } { t.Run(name, func(t *testing.T) { fc, err := newFileClient(ver) require.NoError(t, err) fl := NewFileLoader(fc) - err = fl.Load(test.cfg, info, test.fields, false) - require.NoError(t, err) - tmpl, err := New(ver, prefix, *common.MustNewVersion(ver), test.cfg, test.migration) - require.NoError(t, err) - body, err := buildBody(tmpl, test.cfg, test.fields) + cfg := DefaultConfig() + cfg.Settings = test.settings + + err = fl.Load(cfg, info, nil, false) require.NoError(t, err) - assert.Equal(t, body.StringToPrint()+"\n", fc.body) + assert.Equal(t, "template", fc.component) + assert.Equal(t, tmplName, fc.name) + assert.Equal(t, test.body.StringToPrint()+"\n", fc.body) }) } } type fileClient struct { - ver common.Version - kind, name, body string + component, name, body, ver string } func newFileClient(ver string) (*fileClient, error) { - if ver == "" { - ver = version.GetDefaultVersion() - } - v, err := common.NewVersion(ver) - if err != nil { - return nil, err - } - return &fileClient{ver: *v}, nil + return &fileClient{ver: ver}, nil } func (c *fileClient) GetVersion() common.Version { - return c.ver + return *common.MustNewVersion(c.ver) } func (c *fileClient) Write(component string, name string, body string) error { - c.kind, c.name, c.body = component, name, body + c.component, c.name, c.body = component, name, body return nil } diff --git a/libbeat/template/template.go b/libbeat/template/template.go index 092280cdd6ad..ecdc38d0d5f6 100644 --- a/libbeat/template/template.go +++ b/libbeat/template/template.go @@ -183,6 +183,25 @@ func (t *Template) LoadBytes(data []byte) (common.MapStr, error) { return t.load(fields) } +// LoadMinimal loads the template only with the given configuration +func (t *Template) LoadMinimal() (common.MapStr, error) { + keyPattern, patterns := buildPatternSettings(t.esVersion, t.GetPattern()) + m := common.MapStr{ + keyPattern: patterns, + "order": t.order, + "settings": common.MapStr{ + "index": t.config.Settings.Index, + }, + } + if t.config.Settings.Source != nil { + m["mappings"] = buildMappings( + t.beatVersion, t.esVersion, t.beatName, + nil, nil, + common.MapStr(t.config.Settings.Source)) + } + return m, nil +} + // GetName returns the name of the template func (t *Template) GetName() string { return t.name diff --git a/libbeat/template/testdata/default_fields.yml b/libbeat/template/testdata/default_fields.yml new file mode 100644 index 000000000000..370f31993402 --- /dev/null +++ b/libbeat/template/testdata/default_fields.yml @@ -0,0 +1,7 @@ +- key: test + title: Test default fieldds + fields: + - name: foo + type: keyword + - name: bar + type: keyword diff --git a/libbeat/template/testdata/fields.json b/libbeat/template/testdata/fields.json new file mode 100644 index 000000000000..d95b7a7dabe0 --- /dev/null +++ b/libbeat/template/testdata/fields.json @@ -0,0 +1,16 @@ +{ + "index_patterns": ["foo"], + "settings": { + "number_of_shards": 1 + }, + "mappings": { + "_source": { + "enabled": false + }, + "properties": { + "host_name": { + "type": "keyword" + } + } + } +} From 056d921dd64cc9155def5f9cb81c7a26f5bab40a Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 10 May 2019 09:28:29 -0400 Subject: [PATCH 2/3] Refactor logging in pgsql module (#12151) Guard debug logging statements with "isDebug" checks. And switch the module over to using named loggers. Fixes #12150 --- CHANGELOG.next.asciidoc | 1 + packetbeat/protos/pgsql/parse.go | 98 +++++++++++++++----------------- packetbeat/protos/pgsql/pgsql.go | 47 ++++++++++----- 3 files changed, 80 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eaa4a309c6c7..4dec4b62bbe0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Prevent duplicate packet loss error messages in HTTP events. {pull}10709[10709] - Avoid reporting unknown MongoDB opcodes more than once. {pull}10878[10878] - Fixed a memory leak when using process monitoring under Windows. {pull}12100[12100] +- Improved debug logging efficiency in PGQSL module. {issue}12150[12150] *Winlogbeat* diff --git a/packetbeat/protos/pgsql/parse.go b/packetbeat/protos/pgsql/parse.go index e1bd915335f3..cf7d9f72d97e 100644 --- a/packetbeat/protos/pgsql/parse.go +++ b/packetbeat/protos/pgsql/parse.go @@ -22,7 +22,6 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" ) var ( @@ -34,7 +33,7 @@ var ( ) func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) { - debugf("pgsqlMessageParser, off=%v", s.parseOffset) + pgsql.debugf("pgsqlMessageParser, off=%v", s.parseOffset) var ok, complete bool @@ -46,22 +45,22 @@ func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) { case pgsqlExtendedQueryState: ok, complete = pgsql.parseMessageExtendedQuery(s) default: - logp.Critical("Pgsql invalid parser state") + pgsql.log.Error("Pgsql invalid parser state") } - detailedf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v", + pgsql.detailf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v", ok, complete, s.parseOffset) return ok, complete } func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) { - detailedf("parseMessageStart") + pgsql.detailf("parseMessageStart") m := s.message for len(s.data[s.parseOffset:]) >= 5 { - isSpecial, length, command := isSpecialPgsqlCommand(s.data[s.parseOffset:]) + isSpecial, length, command := pgsql.isSpecialCommand(s.data[s.parseOffset:]) if !isSpecial { return pgsql.parseCommand(s) } @@ -71,7 +70,7 @@ func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) { // check buffer available if len(s.data[s.parseOffset:]) <= length { - detailedf("Wait for more data 1") + pgsql.detailf("Wait for more data 1") return true, false } @@ -103,7 +102,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) { m := s.message // one byte reply to SSLRequest - detailedf("Reply for SSLRequest %c", typ) + pgsql.detailf("Reply for SSLRequest %c", typ) m.start = s.parseOffset s.parseOffset++ m.end = s.parseOffset @@ -118,15 +117,15 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) { length := readLength(s.data[s.parseOffset+1:]) if length < 4 { // length should include the size of itself (int32) - detailedf("Invalid pgsql command length.") + pgsql.detailf("Invalid pgsql command length.") return false, false } if len(s.data[s.parseOffset:]) <= length { - detailedf("Wait for more data") + pgsql.detailf("Wait for more data") return true, false } - detailedf("Pgsql type %c, length=%d", typ, length) + pgsql.detailf("Pgsql type %c, length=%d", typ, length) switch typ { case 'Q': @@ -147,7 +146,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) { return pgsql.parseExtResp(s, length) default: if !pgsqlValidType(typ) { - detailedf("invalid frame type: '%c'", typ) + pgsql.detailf("invalid frame type: '%c'", typ) return false, false } return pgsql.parseSkipMessage(s, length) @@ -172,7 +171,7 @@ func (pgsql *pgsqlPlugin) parseSimpleQuery(s *pgsqlStream, length int) (bool, bo m.query = query m.toExport = true - detailedf("Simple Query: %s", m.query) + pgsql.detailf("Simple Query: %s", m.query) return true, true } @@ -184,12 +183,12 @@ func (pgsql *pgsqlPlugin) parseRowDescription(s *pgsqlStream, length int) (bool, m.isOK = true m.toExport = true - err := pgsqlFieldsParser(s, s.data[s.parseOffset+5:s.parseOffset+length+1]) + err := pgsql.parseFields(s, s.data[s.parseOffset+5:s.parseOffset+length+1]) if err != nil { - detailedf("fields parse failed with: %v", err) + pgsql.detailf("parseFields failed with: %v", err) return false, false } - detailedf("Fields: %s", m.fields) + pgsql.detailf("Fields: %s", m.fields) s.parseOffset++ //type s.parseOffset += length //length @@ -218,7 +217,7 @@ func (pgsql *pgsqlPlugin) parseEmptyQueryResponse(s *pgsqlStream) (bool, bool) { m := s.message - detailedf("EmptyQueryResponse") + pgsql.detailf("EmptyQueryResponse") m.start = s.parseOffset m.isOK = true m.isRequest = false @@ -245,7 +244,7 @@ func (pgsql *pgsqlPlugin) parseCommandComplete(s *pgsqlStream, length int) (bool return false, false } - detailedf("CommandComplete length=%d, tag=%s", length, name) + pgsql.detailf("CommandComplete length=%d, tag=%s", length, name) s.parseOffset += length m.end = s.parseOffset @@ -269,7 +268,7 @@ func (pgsql *pgsqlPlugin) parseReadyForQuery(s *pgsqlStream, length int) (bool, func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, bool) { // ErrorResponse - detailedf("ErrorResponse") + pgsql.detailf("ErrorResponse") m := s.message m.start = s.parseOffset @@ -278,7 +277,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, m.toExport = true s.parseOffset++ //type - pgsqlErrorParser(s, s.data[s.parseOffset+4:s.parseOffset+length]) + pgsql.parseError(s, s.data[s.parseOffset+4:s.parseOffset+length]) s.parseOffset += length //length m.end = s.parseOffset @@ -289,7 +288,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { // Ready for query -> Parse for an extended query request - detailedf("Parse") + pgsql.detailf("Parse") m := s.message m.start = s.parseOffset @@ -303,11 +302,11 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { query, err := common.ReadString(s.data[m.start+6:]) if err != nil { - detailedf("Invalid extended query request") + pgsql.detailf("Invalid extended query request") return false, false } m.query = query - detailedf("Parse in an extended query request: %s", m.query) + pgsql.detailf("Parse in an extended query request: %s", m.query) // Ignore SET statement if strings.HasPrefix(m.query, "SET ") { @@ -319,7 +318,7 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) { // Sync -> Parse completion for an extended query response - detailedf("ParseCompletion") + pgsql.detailf("ParseCompletion") m := s.message m.start = s.parseOffset @@ -329,7 +328,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) s.parseOffset++ //type s.parseOffset += length - detailedf("Parse completion in an extended query response") + pgsql.detailf("Parse completion in an extended query response") s.parseState = pgsqlGetDataState return pgsql.parseMessageData(s) } @@ -349,7 +348,7 @@ func (pgsql *pgsqlPlugin) parseSkipMessage(s *pgsqlStream, length int) (bool, bo return true, true } -func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { +func (pgsql *pgsqlPlugin) parseFields(s *pgsqlStream, buf []byte) error { m := s.message if len(buf) < 2 { @@ -359,7 +358,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { // read field count (int16) off := 2 fieldCount := readCount(buf) - detailedf("Row Description field count=%d", fieldCount) fields := []string{} fieldsFormat := []byte{} @@ -400,8 +398,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { format := common.BytesNtohs(buf[off : off+2]) off += 2 fieldsFormat = append(fieldsFormat, byte(format)) - - detailedf("Field name=%s, format=%d", fieldName, format) } if off < len(buf) { @@ -411,13 +407,13 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { m.fields = fields m.fieldsFormat = fieldsFormat if m.numberOfFields != fieldCount { - logp.Err("Missing fields from RowDescription. Expected %d. Received %d", + pgsql.log.Errorf("Missing fields from RowDescription. Expected %d. Received %d", fieldCount, m.numberOfFields) } return nil } -func pgsqlErrorParser(s *pgsqlStream, buf []byte) { +func (pgsql *pgsqlPlugin) parseError(s *pgsqlStream, buf []byte) { m := s.message off := 0 for off < len(buf) { @@ -430,7 +426,7 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) { // read field value(string) val, err := common.ReadString(buf[off+1:]) if err != nil { - logp.Err("Failed to read the column field") + pgsql.log.Error("Failed to read the column field") break } off += len(val) + 2 @@ -444,11 +440,11 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) { m.errorSeverity = val } } - detailedf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo) + pgsql.detailf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo) } func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { - detailedf("parseMessageData") + pgsql.detailf("parseMessageData") // The response to queries that return row sets contains: // RowDescription @@ -466,12 +462,12 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { length := readLength(s.data[s.parseOffset+1:]) if length < 4 { // length should include the size of itself (int32) - detailedf("Invalid pgsql command length.") + pgsql.detailf("Invalid pgsql command length.") return false, false } if len(s.data[s.parseOffset:]) <= length { // wait for more - detailedf("Wait for more data") + pgsql.detailf("Wait for more data") return true, false } @@ -491,17 +487,17 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { name, err := pgsqlString(s.data[s.parseOffset+4:], length-4) if err != nil { - detailedf("pgsql string invalid") + pgsql.detailf("pgsql string invalid") return false, false } - detailedf("CommandComplete length=%d, tag=%s", length, name) + pgsql.detailf("CommandComplete length=%d, tag=%s", length, name) s.parseOffset += length m.end = s.parseOffset m.size = uint64(m.end - m.start) s.parseState = pgsqlStartState - detailedf("Rows: %s", m.rows) + pgsql.detailf("Rows: %s", m.rows) return true, true case '2': @@ -515,7 +511,7 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { return pgsql.parseRowDescription(s, length) default: // shouldn't happen -> return error - logp.Warn("Pgsql parser expected data message, but received command of type %v", typ) + pgsql.log.Warnf("Pgsql parser expected data message, but received command of type %v", typ) s.parseState = pgsqlStartState return false, false } @@ -530,7 +526,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { // read field count (int16) off := 2 fieldCount := readCount(buf) - detailedf("DataRow field count=%d", fieldCount) + pgsql.detailf("DataRow field count=%d", fieldCount) rows := []string{} rowLength := 0 @@ -545,7 +541,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { off += 4 if columnLength > 0 && columnLength > len(buf[off:]) { - logp.Err("Pgsql invalid column_length=%v, buffer_length=%v, i=%v", + pgsql.log.Errorf("Pgsql invalid column_length=%v, buffer_length=%v, i=%v", columnLength, len(buf[off:]), i) return errInvalidLength } @@ -568,7 +564,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { rowLength += len(columnValue) } - detailedf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off) + pgsql.detailf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off) } if off < len(buf) { @@ -584,7 +580,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { } func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) { - detailedf("parseMessageExtendedQuery") + pgsql.detailf("parseMessageExtendedQuery") // An extended query request contains: // Parse @@ -603,12 +599,12 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) length := readLength(s.data[s.parseOffset+1:]) if length < 4 { // length should include the size of itself (int32) - detailedf("Invalid pgsql command length.") + pgsql.detailf("Invalid pgsql command length.") return false, false } if len(s.data[s.parseOffset:]) <= length { // wait for more - detailedf("Wait for more data") + pgsql.detailf("Wait for more data") return true, false } @@ -647,7 +643,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) return true, true default: // shouldn't happen -> return error - logp.Warn("Pgsql parser expected extended query message, but received command of type %v", typ) + pgsql.log.Warnf("Pgsql parser expected extended query message, but received command of type %v", typ) s.parseState = pgsqlStartState return false, false } @@ -656,7 +652,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) return true, false } -func isSpecialPgsqlCommand(data []byte) (bool, int, int) { +func (pgsql *pgsqlPlugin) isSpecialCommand(data []byte) (bool, int, int) { if len(data) < 8 { // 8 bytes required return false, 0, 0 @@ -670,15 +666,15 @@ func isSpecialPgsqlCommand(data []byte) (bool, int, int) { if length == 16 && code == 80877102 { // Cancel Request - logp.Debug("pgsqldetailed", "Cancel Request, length=%d", length) + pgsql.debugf("Cancel Request, length=%d", length) return true, length, cancelRequest } else if length == 8 && code == 80877103 { // SSL Request - logp.Debug("pgsqldetailed", "SSL Request, length=%d", length) + pgsql.debugf("SSL Request, length=%d", length) return true, length, sslRequest } else if code == 196608 { // Startup Message - logp.Debug("pgsqldetailed", "Startup Message, length=%d", length) + pgsql.debugf("Startup Message, length=%d", length) return true, length, startupMessage } return false, 0, 0 diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index f48731fee72f..a16c07d64b9a 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -30,9 +30,13 @@ import ( "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + + "go.uber.org/zap" ) type pgsqlPlugin struct { + log, debug, detail *logp.Logger + isDebug, isDetail bool // config ports []int @@ -125,11 +129,6 @@ var ( errInvalidLength = errors.New("invalid length") ) -var ( - debugf = logp.MakeDebug("pgsql") - detailedf = logp.MakeDebug("pgsqldetailed") -) - var ( unmatchedResponses = monitoring.NewInt(nil, "pgsql.unmatched_responses") ) @@ -160,6 +159,11 @@ func New( func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) error { pgsql.setFromConfig(config) + pgsql.log = logp.NewLogger("pgsql") + pgsql.debug = logp.NewLogger("pgsql", zap.AddCallerSkip(1)) + pgsql.detail = logp.NewLogger("pgsqldetailed", zap.AddCallerSkip(1)) + pgsql.isDebug, pgsql.isDetail = logp.IsDebug("pgsql"), logp.IsDebug("pgsqldetailed") + pgsql.transactions = common.NewCache( pgsql.transactionTimeout, protos.DefaultTransactionHashSize) @@ -187,6 +191,20 @@ func (pgsql *pgsqlPlugin) getTransaction(k common.HashableTCPTuple) []*pgsqlTran return nil } +//go:inline +func (pgsql *pgsqlPlugin) debugf(format string, v ...interface{}) { + if pgsql.isDebug { + pgsql.debug.Debugf(format, v...) + } +} + +//go:inline +func (pgsql *pgsqlPlugin) detailf(format string, v ...interface{}) { + if pgsql.isDetail { + pgsql.detail.Debugf(format, v...) + } +} + func (pgsql *pgsqlPlugin) GetPorts() []int { return pgsql.ports } @@ -237,13 +255,13 @@ func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, data: pkt.Payload, message: &pgsqlMessage{ts: pkt.Ts}, } - logp.Debug("pgsqldetailed", "New stream created") + pgsql.detailf("New stream created") } else { // concatenate bytes priv.data[dir].data = append(priv.data[dir].data, pkt.Payload...) - logp.Debug("pgsqldetailed", "Len data: %d cap data: %d", len(priv.data[dir].data), cap(priv.data[dir].data)) + pgsql.detailf("Len data: %d cap data: %d", len(priv.data[dir].data), cap(priv.data[dir].data)) if len(priv.data[dir].data) > tcp.TCPMaxDataInStream { - debugf("Stream data too large, dropping TCP stream") + pgsql.debugf("Stream data too large, dropping TCP stream") priv.data[dir] = nil return priv } @@ -262,12 +280,11 @@ func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, } ok, complete := pgsql.pgsqlMessageParser(priv.data[dir]) - //logp.Debug("pgsqldetailed", "MessageParser returned ok=%v complete=%v", ok, complete) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it priv.data[dir] = nil - debugf("Ignore Postgresql message. Drop tcp stream. Try parsing with the next segment") + pgsql.debugf("Ignore Postgresql message. Drop tcp stream. Try parsing with the next segment") return priv } @@ -333,7 +350,7 @@ func (pgsql *pgsqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, // next layer but mark it as incomplete. stream := pgsqlData.data[dir] if messageHasEnoughData(stream.message) { - debugf("Message not complete, but sending to the next layer") + pgsql.debugf("Message not complete, but sending to the next layer") m := stream.message m.toExport = true m.end = stream.parseOffset @@ -378,7 +395,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlRequest(msg *pgsqlMessage) { // separated by ';' queries := pgsqlQueryParser(msg.query) - logp.Debug("pgsqldetailed", "Queries (%d) :%s", len(queries), queries) + pgsql.debugf("Queries (%d) :%s", len(queries), queries) transList := pgsql.getTransaction(tuple.Hashable()) if transList == nil { @@ -414,7 +431,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { tuple := msg.tcpTuple transList := pgsql.getTransaction(tuple.Hashable()) if transList == nil || len(transList) == 0 { - debugf("Response from unknown transaction. Ignoring.") + pgsql.debugf("Response from unknown transaction. Ignoring.") unmatchedResponses.Add(1) return } @@ -424,7 +441,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { // check if the request was received if trans.pgsql == nil { - debugf("Response from unknown transaction. Ignoring.") + pgsql.debugf("Response from unknown transaction. Ignoring.") unmatchedResponses.Add(1) return } @@ -449,7 +466,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { pgsql.publishTransaction(trans) - debugf("Postgres transaction completed: %s\n%s", trans.pgsql, trans.responseRaw) + pgsql.debugf("Postgres transaction completed: %s\n%s", trans.pgsql, trans.responseRaw) } func (pgsql *pgsqlPlugin) publishTransaction(t *pgsqlTransaction) { From 59378cd6e29d13e454d2cfcfd16ab22b291677f8 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 10 May 2019 20:07:41 +0200 Subject: [PATCH 3/3] Ignore doc type in ES search API for ES 8 (#12171) --- libbeat/outputs/elasticsearch/api.go | 5 +++++ libbeat/outputs/elasticsearch/api_integration_test.go | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/elasticsearch/api.go b/libbeat/outputs/elasticsearch/api.go index a0f2d0a9d0f7..91c15cf2a468 100644 --- a/libbeat/outputs/elasticsearch/api.go +++ b/libbeat/outputs/elasticsearch/api.go @@ -23,6 +23,8 @@ import ( "strconv" "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/common" ) // QueryResult contains the result of a query. @@ -235,6 +237,9 @@ func (es *Connection) SearchURIWithBody( params map[string]string, body interface{}, ) (int, *SearchResults, error) { + if !es.version.LessThan(&common.Version{Major: 8}) { + docType = "" + } status, resp, err := es.apiCall("GET", index, docType, "_search", "", params, body) if err != nil { return status, nil, err diff --git a/libbeat/outputs/elasticsearch/api_integration_test.go b/libbeat/outputs/elasticsearch/api_integration_test.go index 2ee8d8a123a7..31903cf60e5d 100644 --- a/libbeat/outputs/elasticsearch/api_integration_test.go +++ b/libbeat/outputs/elasticsearch/api_integration_test.go @@ -61,7 +61,7 @@ func TestIndex(t *testing.T) { } _, result, err := client.SearchURIWithBody(index, "", nil, map[string]interface{}{}) if err != nil { - t.Errorf("SearchUriWithBody() returns an error: %s", err) + t.Fatalf("SearchUriWithBody() returns an error: %s", err) } if result.Hits.Total.Value != 1 { t.Errorf("Wrong number of search results: %d", result.Hits.Total.Value) @@ -72,7 +72,7 @@ func TestIndex(t *testing.T) { } _, result, err = client.SearchURI(index, "test", params) if err != nil { - t.Errorf("SearchUri() returns an error: %s", err) + t.Fatalf("SearchUri() returns an error: %s", err) } if result.Hits.Total.Value != 1 { t.Errorf("Wrong number of search results: %d", result.Hits.Total.Value)