Skip to content

Commit

Permalink
Separate add_fields and add_labels source files
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jan 21, 2019
1 parent 451ac6a commit 9ec0ba0
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 162 deletions.
98 changes: 98 additions & 0 deletions libbeat/processors/actions/add_fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"encoding/json"
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type addFields struct {
fields common.MapStr
shared bool
}

// FieldsKey is the default target key for the add_fields processor.
const FieldsKey = "fields"

func init() {
processors.RegisterPlugin("add_fields",
configChecked(createAddFields,
requireFields(FieldsKey),
allowedFields(FieldsKey, "target", "when")))
}

func createAddFields(c *common.Config) (processors.Processor, error) {
config := struct {
Fields common.MapStr `config:"fields" validate:"required"`
Target *string `config:"target"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_fields configuration: %s", err)
}

return makeFieldsProcessor(
optTarget(config.Target, FieldsKey),
config.Fields,
true,
), nil
}

// NewAddFields creates a new processor adding the given fields to events.
// Set `shared` true if there is the chance of labels being changed/modified by
// subsequent processors.
func NewAddFields(fields common.MapStr, shared bool) processors.Processor {
return &addFields{fields: fields, shared: shared}
}

func (af *addFields) Run(event *beat.Event) (*beat.Event, error) {
fields := af.fields
if af.shared {
fields = fields.Clone()
}

event.Fields.DeepUpdate(fields)
return event, nil
}

func (af *addFields) String() string {
s, _ := json.Marshal(af.fields)
return fmt.Sprintf("add_fields=%s", s)
}

func optTarget(opt *string, def string) string {
if opt == nil {
return def
}
return *opt
}

func makeFieldsProcessor(target string, fields common.MapStr, shared bool) processors.Processor {
if target != "" {
fields = common.MapStr{
target: fields,
}
}

return NewAddFields(fields, shared)
}
117 changes: 117 additions & 0 deletions libbeat/processors/actions/add_fields_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package actions

import (
"testing"

"github.com/elastic/beats/libbeat/common"
)

func TestAddFields(t *testing.T) {
multi := func(strs ...string) []string { return strs }
single := func(str string) []string { return multi(str) }

testProcessors(t, map[string]testCase{
"add field": {
event: common.MapStr{},
want: common.MapStr{
"fields": common.MapStr{"field": "test"},
},
cfg: single(`{add_fields: {fields: {field: test}}}`),
},
"custom target": {
event: common.MapStr{},
want: common.MapStr{
"my": common.MapStr{"field": "test"},
},
cfg: single(`{add_fields: {target: my, fields: {field: test}}}`),
},
"overwrite existing field": {
event: common.MapStr{
"fields": common.MapStr{"field": "old"},
},
want: common.MapStr{"fields": common.MapStr{"field": "test"}},
cfg: single(`{add_fields: {fields: {field: test}}}`),
},
"merge with existing fields": {
event: common.MapStr{
"fields": common.MapStr{"existing": "a"},
},
want: common.MapStr{
"fields": common.MapStr{"existing": "a", "field": "test"},
},
cfg: single(`{add_fields: {fields: {field: test}}}`),
},
"combine 2 processors": {
event: common.MapStr{},
want: common.MapStr{
"fields": common.MapStr{
"l1": "a",
"l2": "b",
},
},
cfg: multi(
`{add_fields: {fields: {l1: a}}}`,
`{add_fields: {fields: {l2: b}}}`,
),
},
"different targets": {
event: common.MapStr{},
want: common.MapStr{
"a": common.MapStr{"l1": "a"},
"b": common.MapStr{"l2": "b"},
},
cfg: multi(
`{add_fields: {target: a, fields: {l1: a}}}`,
`{add_fields: {target: b, fields: {l2: b}}}`,
),
},
"under root": {
event: common.MapStr{},
want: common.MapStr{
"a": common.MapStr{"b": "test"},
},
cfg: single(
`{add_fields: {target: "", fields: {a.b: test}}}`,
),
},
"merge under root": {
event: common.MapStr{
"a": common.MapStr{"old": "value"},
},
want: common.MapStr{
"a": common.MapStr{"old": "value", "new": "test"},
},
cfg: single(
`{add_fields: {target: "", fields: {a.new: test}}}`,
),
},
"overwrite existing under root": {
event: common.MapStr{
"a": common.MapStr{"keep": "value", "change": "a"},
},
want: common.MapStr{
"a": common.MapStr{"keep": "value", "change": "b"},
},
cfg: single(
`{add_fields: {target: "", fields: {a.change: b}}}`,
),
},
})
}
69 changes: 0 additions & 69 deletions libbeat/processors/actions/add_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,20 @@
package actions

import (
"encoding/json"
"fmt"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)

type addFields struct {
fields common.MapStr
shared bool
}

// LabelsKey is the default target key for the add_labels processor.
const LabelsKey = "labels"
const FieldsKey = "fields"

func init() {
processors.RegisterPlugin("add_labels",
configChecked(createAddLabels,
requireFields(LabelsKey),
allowedFields(LabelsKey, "when")))

processors.RegisterPlugin("add_fields",
configChecked(createAddFields,
requireFields(FieldsKey),
allowedFields(FieldsKey, "target", "when")))
}

func createAddLabels(c *common.Config) (processors.Processor, error) {
Expand All @@ -59,40 +46,6 @@ func createAddLabels(c *common.Config) (processors.Processor, error) {
return makeFieldsProcessor(LabelsKey, config.Labels.Flatten(), true), nil
}

func createAddFields(c *common.Config) (processors.Processor, error) {
config := struct {
Fields common.MapStr `config:"fields" validate:"required"`
Target *string `config:"target"`
}{}
err := c.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("fail to unpack the add_fields configuration: %s", err)
}

return makeFieldsProcessor(
optTarget(config.Target, FieldsKey),
config.Fields,
true,
), nil
}

func optTarget(opt *string, def string) string {
if opt == nil {
return def
}
return *opt
}

func makeFieldsProcessor(target string, fields common.MapStr, shared bool) processors.Processor {
if target != "" {
fields = common.MapStr{
target: fields,
}
}

return NewAddFields(fields, shared)
}

// NewAddLabels creates a new processor adding the given object to events. Set
// `shared` true if there is the chance of labels being changed/modified by
// subsequent processors.
Expand All @@ -104,25 +57,3 @@ func NewAddLabels(labels common.MapStr, shared bool) processors.Processor {
LabelsKey: labels.Flatten(),
}, shared)
}

// NewAddFields creates a new processor adding the given fields to events.
// Set `shared` true if there is the chance of labels being changed/modified by
// subsequent processors.
func NewAddFields(fields common.MapStr, shared bool) processors.Processor {
return &addFields{fields: fields, shared: shared}
}

func (af *addFields) Run(event *beat.Event) (*beat.Event, error) {
fields := af.fields
if af.shared {
fields = fields.Clone()
}

event.Fields.DeepUpdate(fields)
return event, nil
}

func (af *addFields) String() string {
s, _ := json.Marshal(af.fields)
return fmt.Sprintf("add_fields=%s", s)
}
Loading

0 comments on commit 9ec0ba0

Please sign in to comment.