Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #10945 to 7.0: [Filebeat] NetFlow input support for custom field definitions #11223

Merged
merged 2 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di
*Filebeat*

- Add ISO8601 timestamp support in syslog metricset. {issue}8716[8716] {pull}10736[10736]
- Add support for loading custom NetFlow and IPFIX field definitions to netflow input. {pull}10945[10945] {pull}11223[11223]

*Heartbeat*

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/_meta/common.reference.inputs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@
# processing.
#queue_size: 8192

# Custom field definitions for NetFlow V9 / IPFIX.
# List of files with YAML fields definition.
#custom_definitions:
#- path/to/ipfix.yaml
#- path/to/netflow.yaml
48 changes: 48 additions & 0 deletions x-pack/filebeat/docs/inputs/input-netflow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Example configuration:
protocols: [ v5, v9, ipfix ]
expiration_timeout: 30m
queue_size: 8192
custom_definitions:
- path/to/fields.yml
----


Expand Down Expand Up @@ -61,6 +63,52 @@ The maximum number of packets that can be queued for processing.
Use this setting to avoid packet-loss when dealing with occasional bursts
of traffic.

[float]
[[custom_definitions]]
==== `custom_definitions`

A list of paths to field definitions YAML files. These allow to update the
NetFlow/IPFIX fields with vendor extensions and to override existing fields.

The expected format is the same as used by Logstash's NetFlow codec
{logstash-ref}/plugins-codecs-netflow.html#plugins-codecs-netflow-ipfix_definitions[ipfix_definitions]
and {logstash-ref}/plugins-codecs-netflow.html#plugins-codecs-netflow-netflow_definitions[netflow_definitions].
{beatname} will detect which of the two formats is used.

NetFlow format example:
["source","yaml",subs="attributes"]
id:
- default length in bytes
- :name
id:
- :uintN or :intN: or :ip4_addr or :ip6_addr or :mac_addr or :string
- :name
id:
- :skip


Where `id` is the numeric field ID.

The IPFIX format similar, but grouped by Private Enterprise Number (PEN):
["source","yaml",subs="attributes"]
pen1:
id:
- :uintN or :ip4_addr or :ip6_addr or :mac_addr or :string
- :name
id:
- :skip
pen2:
id:
- :octetarray
- :name

Note that fields are shared between NetFlow V9 and IPFIX. Changes to
IPFIX PEN zero are equivalent to changes to NetFlow fields.

[WARNING]
Overriding the names and/or types of standard fields can prevent
mapping of ECS fields to function properly.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,11 @@ filebeat.inputs:
# processing.
#queue_size: 8192

# Custom field definitions for NetFlow V9 / IPFIX.
# List of files with YAML fields definition.
#custom_definitions:
#- path/to/ipfix.yaml
#- path/to/netflow.yaml
#========================== Filebeat autodiscover ==============================

# Autodiscover allows you to detect changes in the system and spawn new modules
Expand Down
17 changes: 9 additions & 8 deletions x-pack/filebeat/input/netflow/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ import (
"github.com/elastic/beats/filebeat/inputsource/udp"
)

type config struct {
udp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`
Protocols []string `config:"protocols"`
ExpirationTimeout time.Duration `config:"expiration_timeout"`
PacketQueueSize int `config:"queue_size"`
CustomDefinitions []string `config:"custom_definitions"`
}

var defaultConfig = config{
Config: udp.Config{
MaxMessageSize: 10 * humanize.KiByte,
Expand All @@ -26,11 +35,3 @@ var defaultConfig = config{
ExpirationTimeout: time.Minute * 30,
PacketQueueSize: 8192,
}

type config struct {
udp.Config `config:",inline"`
harvester.ForwarderConfig `config:",inline"`
Protocols []string `config:"protocols"`
ExpirationTimeout time.Duration `config:"expiration_timeout"`
PacketQueueSize int `config:"queue_size"`
}
28 changes: 28 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"io"
"io/ioutil"
"time"

"github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields"
)

// Config stores the configuration used by the NetFlow Collector.
Expand All @@ -16,6 +18,7 @@ type Config struct {
logOutput io.Writer
expiration time.Duration
detectReset bool
fields fields.FieldDict
}

var defaultCfg = Config{
Expand Down Expand Up @@ -61,6 +64,23 @@ func (c *Config) WithSequenceResetEnabled(enabled bool) *Config {
return c
}

// WithCustomFields extends the NetFlow V9/IPFIX supported fields with
// custom ones. This method can be chained multiple times adding fields
// from different sources.
func (c *Config) WithCustomFields(dicts ...fields.FieldDict) *Config {
if len(dicts) == 0 {
return c
}
if c.fields == nil {
c.fields = fields.FieldDict{}
c.fields.Merge(fields.GlobalFields)
}
for _, dict := range dicts {
c.fields.Merge(dict)
}
return c
}

// Protocols returns a list of the protocols enabled.
func (c *Config) Protocols() []string {
return c.protocols
Expand All @@ -81,3 +101,11 @@ func (c *Config) ExpirationTimeout() time.Duration {
func (c *Config) SequenceResetEnabled() bool {
return c.detectReset
}

// Fields returns the configured fields.
func (c *Config) Fields() fields.FieldDict {
if c.fields == nil {
return fields.GlobalFields
}
return c.fields
}
16 changes: 12 additions & 4 deletions x-pack/filebeat/input/netflow/decoder/fields/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package fields

import "fmt"

var Fields = FieldDict{}
var GlobalFields = FieldDict{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var GlobalFields should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var GlobalFields should have comment or be unexported


type Key struct {
EnterpriseID uint32
Expand All @@ -20,12 +20,20 @@ type Field struct {

type FieldDict map[Key]*Field

func RegisterFields(dict FieldDict) error {
func RegisterGlobalFields(dict FieldDict) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function RegisterGlobalFields should have comment or be unexported

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function RegisterGlobalFields should have comment or be unexported

for key, value := range dict {
if _, found := Fields[key]; found {
if _, found := GlobalFields[key]; found {
return fmt.Errorf("field %+v is duplicated", key)
}
Fields[key] = value
GlobalFields[key] = value
}
return nil
}

// Merge merges the passed fields into the dictionary, overwriting existing
// fields if duplicated.
func (f FieldDict) Merge(otherFields FieldDict) {
for key, value := range otherFields {
f[key] = value
}
}
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/fields/field_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package fields

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestFieldDict_Merge(t *testing.T) {
a := FieldDict{
Key{1, 2}: &Field{"field1", String},
Key{2, 3}: &Field{"field2", Unsigned32},
}
b := FieldDict{
Key{3, 4}: &Field{"field3", MacAddress},
Key{4, 5}: &Field{"field4", Ipv4Address},
Key{5, 6}: &Field{"field5", Ipv6Address},
}
c := FieldDict{
Key{3, 4}: &Field{"field3v2", OctetArray},
Key{0, 0}: &Field{"field0", DateTimeMicroseconds},
}

f := FieldDict{}

f.Merge(a)

assert.Len(t, f, len(a))
if !checkContains(t, f, a) {
t.FailNow()
}

f.Merge(b)
assert.Len(t, f, len(a)+len(b))
if !checkContains(t, f, b) {
t.FailNow()
}

f.Merge(c)
assert.Len(t, f, len(a)+len(b)+len(c)-1)
if !checkContains(t, f, c) {
t.FailNow()
}

}

func checkContains(t testing.TB, dest FieldDict, contains FieldDict) bool {
for k, v := range contains {
if !assert.Contains(t, dest, k) || !assert.Equal(t, *v, *dest[k]) {
return false
}
}
return true
}
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ var AssortedFields = FieldDict{
}

func init() {
if err := RegisterFields(AssortedFields); err != nil {
if err := RegisterGlobalFields(AssortedFields); err != nil {
panic(err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var CertFields = FieldDict{
}

func init() {
if err := RegisterFields(CertFields); err != nil {
if err := RegisterGlobalFields(CertFields); err != nil {
panic(err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ var CiscoFields = FieldDict{
}

func init() {
if err := RegisterFields(CiscoFields); err != nil {
if err := RegisterGlobalFields(CiscoFields); err != nil {
panic(err)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ var IpfixFields = FieldDict{
}

func init() {
if err := RegisterFields(IpfixFields); err != nil {
if err := RegisterGlobalFields(IpfixFields); err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func init() {
func New(config config.Config) protocol.Protocol {
logger := log.New(config.LogOutput(), LogPrefix, 0)
decoder := DecoderIPFIX{
DecoderV9: v9.DecoderV9{Logger: logger},
DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()},
}
proto := &IPFixProtocol{
NetflowV9Protocol: *v9.NewProtocolWithDecoder(decoder, config, logger),
Expand Down
45 changes: 45 additions & 0 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/test"
"github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/v9"
Expand Down Expand Up @@ -188,3 +189,47 @@ func TestOptionTemplates(t *testing.T) {
assert.Len(t, s.Templates, 1)
})
}

func TestCustomFields(t *testing.T) {
addr := test.MakeAddress(t, "127.0.0.1:12345")

conf := config.Defaults()
conf.WithCustomFields(fields.FieldDict{
fields.Key{EnterpriseID: 0x12345678, FieldID: 33}: &fields.Field{Name: "customField", Decoder: fields.String},
})
assert.Contains(t, conf.Fields(), fields.Key{EnterpriseID: 0x12345678, FieldID: 33})
proto := New(conf)
flows, err := proto.OnPacket(test.MakePacket([]uint16{
// Header
// Version, Length, Ts, SeqNo, Source
10, 42, 11, 11, 22, 22, 0, 1234,
// Set #1 (record template)
2, 26, /*len of set*/
999, 3,
1, 4, // Field 1
2, 4, // Field 2
// Field 3
0x8000 | 33, 6,
0x1234, 0x5678, // enterprise ID
0, // Padding
}), addr)
assert.NoError(t, err)
assert.Empty(t, flows)

flows, err = proto.OnPacket(test.MakePacket([]uint16{
// Header
// Version, Length, Ts, SeqNo, Source
10, 34, 11, 11, 22, 22, 0, 1234,
// Set (data record)
999, 18, /*len of 999 record */
0x0102, 0x0304, // field 1
0x0506, 0x0708, // field 2
// Field 3
0x5465, 0x7374,
0x4d65,
}), addr)
assert.NoError(t, err)
assert.Len(t, flows, 1)
assert.Contains(t, flows[0].Fields, "customField")
assert.Equal(t, flows[0].Fields["customField"], "TestMe")
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var (
)

func buildDecoderByNameMap() {
for _, value := range fields.Fields {
for _, value := range fields.GlobalFields {
decoderByName[value.Name] = value.Decoder
}
}
Expand Down
Loading