Skip to content

Commit

Permalink
Introduce add_labels and add_tags processors (#9973)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Jan 11, 2019
1 parent 05d7f5e commit 14373c7
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update field definitions for `http` to ECS Beta 2 {pull}9645[9645]
- Add `agent.id` and `agent.ephemeral_id` fields to all beats. {pull}9404[9404]
- Add `name` config option to `add_host_metadata` processor. {pull}9943[9943]
- Add `add_labels` and `add_tags` processors. {pull}9973[9973]

*Auditbeat*

Expand Down
91 changes: 91 additions & 0 deletions libbeat/processors/actions/add_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"encoding/json"
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type addLabels struct {
labels common.MapStr
shared bool
}

// LabelsKey is the default target key for the add_labels processor.
const LabelsKey = "labels"

func init() {
processors.RegisterPlugin("add_labels",
configChecked(createAddLabels,
requireFields("labels"),
allowedFields("labels", "when")))
}

func createAddLabels(c *common.Config) (processors.Processor, error) {
config := struct {
Labels common.MapStr `config:"labels" validate:"required"`
Target *string `config:"target"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_fields configuration: %s", err)
}

var target string
if config.Target == nil {
target = LabelsKey
} else {
target = *config.Target
}

labels := config.Labels
if target != "" {
labels = common.MapStr{
target: labels,
}
}

return NewAddLabels(labels, true), nil
}

// NewAddLabels creates a new processor adding the given object to events. Set
// `shared` true if there is the chance of labels being changed/modified by
// subsequent processors.
func NewAddLabels(labels common.MapStr, shared bool) processors.Processor {
return &addLabels{labels: labels, shared: shared}
}

func (af *addLabels) Run(event *beat.Event) (*beat.Event, error) {
labels := af.labels
if af.shared {
labels = labels.Clone()
}

event.Fields.DeepUpdate(labels)
return event, nil
}

func (af *addLabels) String() string {
s, _ := json.Marshal(af.labels)
return fmt.Sprintf("add_labels=%s", s)
}
128 changes: 128 additions & 0 deletions libbeat/processors/actions/add_labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

func TestAddLabels(t *testing.T) {
multi := func(strs ...string) []string { return strs }
single := func(str string) []string { return multi(str) }

cases := map[string]struct {
event common.MapStr
want common.MapStr
cfg []string
}{
"add label": {
event: common.MapStr{},
want: common.MapStr{
"labels": common.MapStr{"label": "test"},
},
cfg: single(`{labels: {label: test}}`),
},
"custom target": {
event: common.MapStr{},
want: common.MapStr{
"my": common.MapStr{"label": "test"},
},
cfg: single(`{target: my, labels: {label: test}}`),
},
"overwrite existing label": {
event: common.MapStr{
"labels": common.MapStr{"label": "old"},
},
want: common.MapStr{
"labels": common.MapStr{"label": "test"},
},
cfg: single(`{labels: {label: test}}`),
},
"merge with existing labels": {
event: common.MapStr{
"labels": common.MapStr{"existing": "a"},
},
want: common.MapStr{
"labels": common.MapStr{"existing": "a", "label": "test"},
},
cfg: single(`{labels: {label: test}}`),
},
"combine 2 processors": {
event: common.MapStr{},
want: common.MapStr{
"labels": common.MapStr{
"l1": "a",
"l2": "b",
},
},
cfg: multi(
`{labels: {l1: a}}`,
`{labels: {l2: b}}`,
),
},
"different targets": {
event: common.MapStr{},
want: common.MapStr{
"a": common.MapStr{"l1": "a"},
"b": common.MapStr{"l2": "b"},
},
cfg: multi(
`{target: a, labels: {l1: a}}`,
`{target: b, labels: {l2: b}}`,
),
},
}

for name, test := range cases {
test := test
t.Run(name, func(t *testing.T) {
processors := make([]processors.Processor, len(test.cfg))
for i := range test.cfg {
config, err := common.NewConfigWithYAML([]byte(test.cfg[i]), "test")
if err != nil {
t.Fatalf("Failed to create config(%v): %+v", i, err)
}

processors[i], err = createAddLabels(config)
if err != nil {
t.Fatalf("Failed to create add_tags processor(%v): %+v", i, err)
}
}

current := &beat.Event{Fields: test.event.Clone()}
for i, processor := range processors {
var err error
current, err = processor.Run(current)
if err != nil {
t.Fatalf("Unexpected error from add_tags processor(%v): %+v", i, err)
}
if current == nil {
t.Fatalf("Event dropped(%v)", i)
}
}

assert.Equal(t, test.want, current.Fields)
})
}
}
81 changes: 81 additions & 0 deletions libbeat/processors/actions/add_tags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"fmt"
"strings"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type addTags struct {
tags []string
target string
}

func init() {
processors.RegisterPlugin("add_tags",
configChecked(createAddTags,
requireFields("tags"),
allowedFields("tags", "when")))
}

func createAddTags(c *common.Config) (processors.Processor, error) {
config := struct {
Tags []string `config:"tags" validate:"required"`
Target string `config:"target"`
}{}

err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_tags configuration: %s", err)
}

return NewAddTags(config.Target, config.Tags), nil
}

// NewAddTags creates a new processor for adding tags to a field.
// If the target field already contains tags, then the new tags will be
// appended to the existing list of tags.
func NewAddTags(target string, tags []string) processors.Processor {
if target == "" {
target = common.TagsKey
}

// make sure capacity == length such that different processors adding more tags
// do not change/overwrite each other on append
if cap(tags) != len(tags) {
tmp := make([]string, len(tags), len(tags))
copy(tmp, tags)
tags = tmp
}

return &addTags{tags: tags, target: target}
}

func (at *addTags) Run(event *beat.Event) (*beat.Event, error) {
common.AddTagsWithKey(event.Fields, at.target, at.tags)
return event, nil
}

func (at *addTags) String() string {
return fmt.Sprintf("add_tags=%v", strings.Join(at.tags, ","))
}
Loading

0 comments on commit 14373c7

Please sign in to comment.