From cfa6a892d309b24d112002a6f9ea5d832b25c0cf Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 6 Mar 2019 11:14:23 +0100 Subject: [PATCH 1/2] [Filebeat] NetFlow input support for custom field definitions (#10945) This PR adds support for loading custom (enterprise-specific) fields to the Filebeat NetFlow input. These fields can extend and/or override fields in NetFlow V9 and IPFIX. For compatibility, the feature uses the same field definition YAML format as Logstash's netflow codec plugin. A new configuration option custom_definitions consists of a list of paths to definition files. (cherry picked from commit cd4907841d0f5e740cbb08eacb759dda3469911d) --- .../_meta/common.reference.inputs.yml | 5 + .../docs/inputs/input-netflow.asciidoc | 48 ++++ x-pack/filebeat/filebeat.reference.yml | 5 + x-pack/filebeat/input/netflow/config.go | 17 +- .../input/netflow/decoder/config/config.go | 28 +++ .../input/netflow/decoder/fields/field.go | 16 +- .../netflow/decoder/fields/field_test.go | 58 +++++ .../decoder/fields/zfields_assorted.go | 2 +- .../netflow/decoder/fields/zfields_cert.go | 2 +- .../netflow/decoder/fields/zfields_cisco.go | 2 +- .../netflow/decoder/fields/zfields_ipfix.go | 2 +- .../input/netflow/decoder/ipfix/ipfix.go | 2 +- .../input/netflow/decoder/ipfix/ipfix_test.go | 45 ++++ .../netflow/decoder/template/test_helpers.go | 2 +- .../input/netflow/decoder/v9/decoder.go | 12 +- .../filebeat/input/netflow/decoder/v9/v9.go | 2 +- .../input/netflow/decoder/v9/v9_test.go | 41 ++++ x-pack/filebeat/input/netflow/definitions.go | 217 ++++++++++++++++++ .../input/netflow/definitions_test.go | 110 +++++++++ x-pack/filebeat/input/netflow/input.go | 15 +- x-pack/filebeat/input/netflow/netflow_test.go | 24 +- .../input/netflow/testdata/dat_tests.yaml | 216 ++++++++++------- .../fields/netflow9_cisco_asa_custom.yaml | 27 +++ .../golden/Netflow-9-Cisco-ASA-2.golden.json | 31 ++- .../golden/Netflow-9-Cisco-ASA.golden.json | 126 ++++++++-- 25 files changed, 925 insertions(+), 130 deletions(-) create mode 100644 x-pack/filebeat/input/netflow/decoder/fields/field_test.go create mode 100644 x-pack/filebeat/input/netflow/definitions.go create mode 100644 x-pack/filebeat/input/netflow/definitions_test.go create mode 100644 x-pack/filebeat/input/netflow/testdata/fields/netflow9_cisco_asa_custom.yaml diff --git a/x-pack/filebeat/_meta/common.reference.inputs.yml b/x-pack/filebeat/_meta/common.reference.inputs.yml index 8d5c0c7a0f0f..ab42a166d3ae 100644 --- a/x-pack/filebeat/_meta/common.reference.inputs.yml +++ b/x-pack/filebeat/_meta/common.reference.inputs.yml @@ -22,3 +22,8 @@ # processing. #queue_size: 8192 + # Custom field definitions for NetFlow V9 / IPFIX. + # List of files with YAML fields definition. + #custom_definitions: + #- path/to/ipfix.yaml + #- path/to/netflow.yaml diff --git a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc index a156b288028a..cebe980a7d69 100644 --- a/x-pack/filebeat/docs/inputs/input-netflow.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-netflow.asciidoc @@ -29,6 +29,8 @@ Example configuration: protocols: [ v5, v9, ipfix ] expiration_timeout: 30m queue_size: 8192 + custom_definitions: + - path/to/fields.yml ---- @@ -61,6 +63,52 @@ The maximum number of packets that can be queued for processing. Use this setting to avoid packet-loss when dealing with occasional bursts of traffic. +[float] +[[custom_definitions]] +==== `custom_definitions` + +A list of paths to field definitions YAML files. These allow to update the +NetFlow/IPFIX fields with vendor extensions and to override existing fields. + +The expected format is the same as used by Logstash's NetFlow codec +{logstash-ref}/plugins-codecs-netflow.html#plugins-codecs-netflow-ipfix_definitions[ipfix_definitions] +and {logstash-ref}/plugins-codecs-netflow.html#plugins-codecs-netflow-netflow_definitions[netflow_definitions]. +{beatname} will detect which of the two formats is used. + +NetFlow format example: +["source","yaml",subs="attributes"] +id: +- default length in bytes +- :name +id: +- :uintN or :intN: or :ip4_addr or :ip6_addr or :mac_addr or :string +- :name +id: +- :skip + + +Where `id` is the numeric field ID. + +The IPFIX format similar, but grouped by Private Enterprise Number (PEN): +["source","yaml",subs="attributes"] +pen1: + id: + - :uintN or :ip4_addr or :ip6_addr or :mac_addr or :string + - :name + id: + - :skip +pen2: + id: + - :octetarray + - :name + +Note that fields are shared between NetFlow V9 and IPFIX. Changes to +IPFIX PEN zero are equivalent to changes to NetFlow fields. + +[WARNING] +Overriding the names and/or types of standard fields can prevent +mapping of ECS fields to function properly. + [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 07604623621b..531d2d01bfce 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -822,6 +822,11 @@ filebeat.inputs: # processing. #queue_size: 8192 + # Custom field definitions for NetFlow V9 / IPFIX. + # List of files with YAML fields definition. + #custom_definitions: + #- path/to/ipfix.yaml + #- path/to/netflow.yaml #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/x-pack/filebeat/input/netflow/config.go b/x-pack/filebeat/input/netflow/config.go index da9ac9fba00a..e6250dd27dbb 100644 --- a/x-pack/filebeat/input/netflow/config.go +++ b/x-pack/filebeat/input/netflow/config.go @@ -13,6 +13,15 @@ import ( "github.com/elastic/beats/filebeat/inputsource/udp" ) +type config struct { + udp.Config `config:",inline"` + harvester.ForwarderConfig `config:",inline"` + Protocols []string `config:"protocols"` + ExpirationTimeout time.Duration `config:"expiration_timeout"` + PacketQueueSize int `config:"queue_size"` + CustomDefinitions []string `config:"custom_definitions"` +} + var defaultConfig = config{ Config: udp.Config{ MaxMessageSize: 10 * humanize.KiByte, @@ -26,11 +35,3 @@ var defaultConfig = config{ ExpirationTimeout: time.Minute * 30, PacketQueueSize: 8192, } - -type config struct { - udp.Config `config:",inline"` - harvester.ForwarderConfig `config:",inline"` - Protocols []string `config:"protocols"` - ExpirationTimeout time.Duration `config:"expiration_timeout"` - PacketQueueSize int `config:"queue_size"` -} diff --git a/x-pack/filebeat/input/netflow/decoder/config/config.go b/x-pack/filebeat/input/netflow/decoder/config/config.go index c9ee073cdd19..fd124767184f 100644 --- a/x-pack/filebeat/input/netflow/decoder/config/config.go +++ b/x-pack/filebeat/input/netflow/decoder/config/config.go @@ -8,6 +8,8 @@ import ( "io" "io/ioutil" "time" + + "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields" ) // Config stores the configuration used by the NetFlow Collector. @@ -16,6 +18,7 @@ type Config struct { logOutput io.Writer expiration time.Duration detectReset bool + fields fields.FieldDict } var defaultCfg = Config{ @@ -61,6 +64,23 @@ func (c *Config) WithSequenceResetEnabled(enabled bool) *Config { return c } +// WithCustomFields extends the NetFlow V9/IPFIX supported fields with +// custom ones. This method can be chained multiple times adding fields +// from different sources. +func (c *Config) WithCustomFields(dicts ...fields.FieldDict) *Config { + if len(dicts) == 0 { + return c + } + if c.fields == nil { + c.fields = fields.FieldDict{} + c.fields.Merge(fields.GlobalFields) + } + for _, dict := range dicts { + c.fields.Merge(dict) + } + return c +} + // Protocols returns a list of the protocols enabled. func (c *Config) Protocols() []string { return c.protocols @@ -81,3 +101,11 @@ func (c *Config) ExpirationTimeout() time.Duration { func (c *Config) SequenceResetEnabled() bool { return c.detectReset } + +// Fields returns the configured fields. +func (c *Config) Fields() fields.FieldDict { + if c.fields == nil { + return fields.GlobalFields + } + return c.fields +} diff --git a/x-pack/filebeat/input/netflow/decoder/fields/field.go b/x-pack/filebeat/input/netflow/decoder/fields/field.go index 8106b7d43a11..2acaa8a9beec 100644 --- a/x-pack/filebeat/input/netflow/decoder/fields/field.go +++ b/x-pack/filebeat/input/netflow/decoder/fields/field.go @@ -6,7 +6,7 @@ package fields import "fmt" -var Fields = FieldDict{} +var GlobalFields = FieldDict{} type Key struct { EnterpriseID uint32 @@ -20,12 +20,20 @@ type Field struct { type FieldDict map[Key]*Field -func RegisterFields(dict FieldDict) error { +func RegisterGlobalFields(dict FieldDict) error { for key, value := range dict { - if _, found := Fields[key]; found { + if _, found := GlobalFields[key]; found { return fmt.Errorf("field %+v is duplicated", key) } - Fields[key] = value + GlobalFields[key] = value } return nil } + +// Merge merges the passed fields into the dictionary, overwriting existing +// fields if duplicated. +func (f FieldDict) Merge(otherFields FieldDict) { + for key, value := range otherFields { + f[key] = value + } +} diff --git a/x-pack/filebeat/input/netflow/decoder/fields/field_test.go b/x-pack/filebeat/input/netflow/decoder/fields/field_test.go new file mode 100644 index 000000000000..883a0c6903b8 --- /dev/null +++ b/x-pack/filebeat/input/netflow/decoder/fields/field_test.go @@ -0,0 +1,58 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fields + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFieldDict_Merge(t *testing.T) { + a := FieldDict{ + Key{1, 2}: &Field{"field1", String}, + Key{2, 3}: &Field{"field2", Unsigned32}, + } + b := FieldDict{ + Key{3, 4}: &Field{"field3", MacAddress}, + Key{4, 5}: &Field{"field4", Ipv4Address}, + Key{5, 6}: &Field{"field5", Ipv6Address}, + } + c := FieldDict{ + Key{3, 4}: &Field{"field3v2", OctetArray}, + Key{0, 0}: &Field{"field0", DateTimeMicroseconds}, + } + + f := FieldDict{} + + f.Merge(a) + + assert.Len(t, f, len(a)) + if !checkContains(t, f, a) { + t.FailNow() + } + + f.Merge(b) + assert.Len(t, f, len(a)+len(b)) + if !checkContains(t, f, b) { + t.FailNow() + } + + f.Merge(c) + assert.Len(t, f, len(a)+len(b)+len(c)-1) + if !checkContains(t, f, c) { + t.FailNow() + } + +} + +func checkContains(t testing.TB, dest FieldDict, contains FieldDict) bool { + for k, v := range contains { + if !assert.Contains(t, dest, k) || !assert.Equal(t, *v, *dest[k]) { + return false + } + } + return true +} diff --git a/x-pack/filebeat/input/netflow/decoder/fields/zfields_assorted.go b/x-pack/filebeat/input/netflow/decoder/fields/zfields_assorted.go index 92b60264dabd..6a08743e54d1 100644 --- a/x-pack/filebeat/input/netflow/decoder/fields/zfields_assorted.go +++ b/x-pack/filebeat/input/netflow/decoder/fields/zfields_assorted.go @@ -512,7 +512,7 @@ var AssortedFields = FieldDict{ } func init() { - if err := RegisterFields(AssortedFields); err != nil { + if err := RegisterGlobalFields(AssortedFields); err != nil { panic(err) } } diff --git a/x-pack/filebeat/input/netflow/decoder/fields/zfields_cert.go b/x-pack/filebeat/input/netflow/decoder/fields/zfields_cert.go index 21027203179a..1c65b7845505 100644 --- a/x-pack/filebeat/input/netflow/decoder/fields/zfields_cert.go +++ b/x-pack/filebeat/input/netflow/decoder/fields/zfields_cert.go @@ -109,7 +109,7 @@ var CertFields = FieldDict{ } func init() { - if err := RegisterFields(CertFields); err != nil { + if err := RegisterGlobalFields(CertFields); err != nil { panic(err) } } diff --git a/x-pack/filebeat/input/netflow/decoder/fields/zfields_cisco.go b/x-pack/filebeat/input/netflow/decoder/fields/zfields_cisco.go index e50c5e8f5893..05dce2458485 100644 --- a/x-pack/filebeat/input/netflow/decoder/fields/zfields_cisco.go +++ b/x-pack/filebeat/input/netflow/decoder/fields/zfields_cisco.go @@ -280,7 +280,7 @@ var CiscoFields = FieldDict{ } func init() { - if err := RegisterFields(CiscoFields); err != nil { + if err := RegisterGlobalFields(CiscoFields); err != nil { panic(err) } } diff --git a/x-pack/filebeat/input/netflow/decoder/fields/zfields_ipfix.go b/x-pack/filebeat/input/netflow/decoder/fields/zfields_ipfix.go index 87c2af478846..1a47ad39c269 100644 --- a/x-pack/filebeat/input/netflow/decoder/fields/zfields_ipfix.go +++ b/x-pack/filebeat/input/netflow/decoder/fields/zfields_ipfix.go @@ -469,7 +469,7 @@ var IpfixFields = FieldDict{ } func init() { - if err := RegisterFields(IpfixFields); err != nil { + if err := RegisterGlobalFields(IpfixFields); err != nil { panic(err) } } diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go index 754a39e95b2c..38214a5f5ac5 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go @@ -31,7 +31,7 @@ func init() { func New(config config.Config) protocol.Protocol { logger := log.New(config.LogOutput(), LogPrefix, 0) decoder := DecoderIPFIX{ - DecoderV9: v9.DecoderV9{Logger: logger}, + DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()}, } proto := &IPFixProtocol{ NetflowV9Protocol: *v9.NewProtocolWithDecoder(decoder, config, logger), diff --git a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go index fca442708155..640889a0ea94 100644 --- a/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go +++ b/x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/config" + "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/test" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/v9" @@ -188,3 +189,47 @@ func TestOptionTemplates(t *testing.T) { assert.Len(t, s.Templates, 1) }) } + +func TestCustomFields(t *testing.T) { + addr := test.MakeAddress(t, "127.0.0.1:12345") + + conf := config.Defaults() + conf.WithCustomFields(fields.FieldDict{ + fields.Key{EnterpriseID: 0x12345678, FieldID: 33}: &fields.Field{Name: "customField", Decoder: fields.String}, + }) + assert.Contains(t, conf.Fields(), fields.Key{EnterpriseID: 0x12345678, FieldID: 33}) + proto := New(conf) + flows, err := proto.OnPacket(test.MakePacket([]uint16{ + // Header + // Version, Length, Ts, SeqNo, Source + 10, 42, 11, 11, 22, 22, 0, 1234, + // Set #1 (record template) + 2, 26, /*len of set*/ + 999, 3, + 1, 4, // Field 1 + 2, 4, // Field 2 + // Field 3 + 0x8000 | 33, 6, + 0x1234, 0x5678, // enterprise ID + 0, // Padding + }), addr) + assert.NoError(t, err) + assert.Empty(t, flows) + + flows, err = proto.OnPacket(test.MakePacket([]uint16{ + // Header + // Version, Length, Ts, SeqNo, Source + 10, 34, 11, 11, 22, 22, 0, 1234, + // Set (data record) + 999, 18, /*len of 999 record */ + 0x0102, 0x0304, // field 1 + 0x0506, 0x0708, // field 2 + // Field 3 + 0x5465, 0x7374, + 0x4d65, + }), addr) + assert.NoError(t, err) + assert.Len(t, flows, 1) + assert.Contains(t, flows[0].Fields, "customField") + assert.Equal(t, flows[0].Fields["customField"], "TestMe") +} diff --git a/x-pack/filebeat/input/netflow/decoder/template/test_helpers.go b/x-pack/filebeat/input/netflow/decoder/template/test_helpers.go index 3f33aafe2b51..48f41c9be761 100644 --- a/x-pack/filebeat/input/netflow/decoder/template/test_helpers.go +++ b/x-pack/filebeat/input/netflow/decoder/template/test_helpers.go @@ -20,7 +20,7 @@ var ( ) func buildDecoderByNameMap() { - for _, value := range fields.Fields { + for _, value := range fields.GlobalFields { decoderByName[value.Name] = value.Decoder } } diff --git a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go index 637430a37aac..f59726846525 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/decoder.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/decoder.go @@ -30,10 +30,12 @@ type Decoder interface { ReadTemplateSet(setID uint16, buf *bytes.Buffer) ([]*template.Template, error) ReadFieldDefinition(*bytes.Buffer) (field fields.Key, length uint16, err error) GetLogger() *log.Logger + GetFields() fields.FieldDict } type DecoderV9 struct { Logger *log.Logger + Fields fields.FieldDict } var _ Decoder = (*DecoderV9)(nil) @@ -92,7 +94,15 @@ func (d DecoderV9) ReadFieldDefinition(buf *bytes.Buffer) (field fields.Key, len return field, length, nil } +func (d DecoderV9) GetFields() fields.FieldDict { + if f := d.Fields; f != nil { + return f + } + return fields.GlobalFields +} + func ReadFields(d Decoder, buf *bytes.Buffer, count int) (record template.Template, err error) { + knownFields := d.GetFields() logger := d.GetLogger() record.Fields = make([]template.FieldTemplate, count) for i := 0; i < count; i++ { @@ -109,7 +119,7 @@ func ReadFields(d Decoder, buf *bytes.Buffer, count int) (record template.Templa } else { record.Length += int(field.Length) } - if fieldInfo, found := fields.Fields[key]; found { + if fieldInfo, found := knownFields[key]; found { min, max := fieldInfo.Decoder.MinLength(), fieldInfo.Decoder.MaxLength() if length == template.VariableLength || min <= field.Length && field.Length <= max { field.Info = fieldInfo diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9.go b/x-pack/filebeat/input/netflow/decoder/v9/v9.go index 5cbf3ee69151..c4efe44b56c0 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9.go @@ -39,7 +39,7 @@ func init() { func New(config config.Config) protocol.Protocol { logger := log.New(config.LogOutput(), LogPrefix, 0) - return NewProtocolWithDecoder(DecoderV9{logger}, config, logger) + return NewProtocolWithDecoder(DecoderV9{Logger: logger, Fields: config.Fields()}, config, logger) } func NewProtocolWithDecoder(decoder Decoder, config config.Config, logger *log.Logger) *NetflowV9Protocol { diff --git a/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go b/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go index 75ab11b80d2a..9a889b51f3da 100644 --- a/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go +++ b/x-pack/filebeat/input/netflow/decoder/v9/v9_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/config" + "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/test" ) @@ -180,3 +181,43 @@ func TestSessionReset(t *testing.T) { assert.Empty(t, flows) }) } + +func TestCustomFields(t *testing.T) { + addr := test.MakeAddress(t, "127.0.0.1:12345") + + conf := config.Defaults() + conf.WithCustomFields(fields.FieldDict{ + fields.Key{FieldID: 33333}: &fields.Field{Name: "customField", Decoder: fields.String}, + }) + assert.Contains(t, conf.Fields(), fields.Key{FieldID: 33333}) + proto := New(conf) + flows, err := proto.OnPacket(test.MakePacket([]uint16{ + // Header + // Version, Count, Uptime, Ts, SeqNo, Source + 9, 1, 11, 11, 22, 22, 33, 33, 0, 1234, + // Set #1 (template) + 0, 20, /*len of set*/ + 999, 3, /*len*/ + 1, 4, // Fields + 2, 4, + 33333, 8, + }), addr) + assert.NoError(t, err) + assert.Empty(t, flows) + + flows, err = proto.OnPacket(test.MakePacket([]uint16{ + // Header + // Version, Count, Uptime, Ts, SeqNo, Source + 9, 1, 11, 11, 22, 22, 33, 34, 0, 1234, + // Set #1 (template) + 999, 20, /*len of set*/ + 1, 1, + 2, 2, + 0x4865, 0x6c6c, + 0x6f20, 0x3a29, + }), addr) + assert.NoError(t, err) + assert.Len(t, flows, 1) + assert.Contains(t, flows[0].Fields, "customField") + assert.Equal(t, flows[0].Fields["customField"], "Hello :)") +} diff --git a/x-pack/filebeat/input/netflow/definitions.go b/x-pack/filebeat/input/netflow/definitions.go new file mode 100644 index 000000000000..3d1832d4dec5 --- /dev/null +++ b/x-pack/filebeat/input/netflow/definitions.go @@ -0,0 +1,217 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package netflow + +import ( + "fmt" + "io/ioutil" + "math" + "os" + "strconv" + + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + + "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields" +) + +var logstashName2Decoder = map[string]fields.Decoder{ + "double": fields.Float64, + "float": fields.Float32, + "int8": fields.Signed8, + "int15": fields.SignedDecoder(15), + "int16": fields.Signed16, + "int24": fields.SignedDecoder(24), + "int32": fields.Signed32, + "int64": fields.Signed64, + "ip4_addr": fields.Ipv4Address, + "ip4addr": fields.Ipv4Address, + "ip6_addr": fields.Ipv6Address, + "ip6addr": fields.Ipv6Address, + "mac_addr": fields.MacAddress, + "macaddr": fields.MacAddress, + "string": fields.String, + "uint8": fields.Unsigned8, + "uint15": fields.UnsignedDecoder(15), + "uint16": fields.Unsigned16, + "uint24": fields.UnsignedDecoder(24), + "uint32": fields.Unsigned32, + "uint64": fields.Unsigned64, + "octet_array": fields.OctetArray, + "octetarray": fields.OctetArray, + "acl_id_asa": fields.UnsupportedDecoder{}, + "mpls_label_stack_octets": fields.UnsupportedDecoder{}, + "application_id": fields.UnsupportedDecoder{}, + "forwarding_status": fields.UnsupportedDecoder{}, +} + +// LoadFieldDefinitions takes a parsed YAML tree from a Logstash +// Netflow or IPFIX custom fields format and converts it to a FieldDict. +func LoadFieldDefinitions(yaml interface{}) (defs fields.FieldDict, err error) { + tree, ok := yaml.(map[interface{}]interface{}) + if !ok { + return nil, fmt.Errorf("invalid custom fields definition format: expected a mapping of integer keys. Got %T", yaml) + } + if len(tree) == 0 { + return nil, nil + } + isIPFIX, err := fieldsAreIPFIX(tree) + if err != nil { + return nil, err + } + defs = fields.FieldDict{} + if !isIPFIX { + if err := loadFields(tree, 0, defs); err != nil { + return nil, errors.Wrap(err, "failed to load NetFlow fields") + } + return defs, nil + } + for pemI, fields := range tree { + pem, err := toInt(pemI) + if err != nil { + return nil, err + } + if !fits(pem, 0, math.MaxUint32) { + return nil, fmt.Errorf("PEM %d out of uint32 range", pem) + } + tree, ok := fields.(map[interface{}]interface{}) + if !ok { + return nil, fmt.Errorf("IPFIX fields for pem=%d malformed", pem) + } + if err := loadFields(tree, uint32(pem), defs); err != nil { + return nil, errors.Wrapf(err, "failed to load IPFIX fields for pem=%d", pem) + } + } + return defs, nil +} + +// LoadFieldDefinitionsFromFile takes the path to a YAML file in Logstash +// Netflow or IPFIX custom fields format and converts it to a FieldDict. +func LoadFieldDefinitionsFromFile(path string) (defs fields.FieldDict, err error) { + file, err := os.Open(path) + if err != nil { + return nil, err + } + defer file.Close() + contents, err := ioutil.ReadAll(file) + if err != nil { + return nil, err + } + var tree interface{} + if err := yaml.Unmarshal(contents, &tree); err != nil { + return nil, errors.Wrap(err, "unable to parse YAML") + } + return LoadFieldDefinitions(tree) +} + +func fits(value, min, max int64) bool { + return value >= min && value <= max +} + +func trimColon(s string) string { + if len(s) > 0 && s[0] == ':' { + return s[1:] + } + return s +} + +func toInt(value interface{}) (int64, error) { + switch v := value.(type) { + case int64: + return v, nil + case int: + return int64(v), nil + case string: + return strconv.ParseInt(v, 0, 64) + } + return 0, fmt.Errorf("value %v cannot be converted to int", value) +} + +func loadFields(def map[interface{}]interface{}, pem uint32, dest fields.FieldDict) error { + for keyI, iface := range def { + fieldID, err := toInt(keyI) + if err != nil { + return err + } + if !fits(fieldID, 0, math.MaxUint16) { + return fmt.Errorf("field ID %d out of range uint16", fieldID) + } + list, ok := iface.([]interface{}) + if !ok { + return fmt.Errorf("field ID %d is not a list", fieldID) + } + bad := true + var fieldType, fieldName string + switch len(list) { + case 2: + switch v := list[0].(type) { + case string: + fieldType = trimColon(v) + case int: + if v == 0 { + v = 4 + } + fieldType = fmt.Sprintf("uint%d", v*8) + } + if name, ok := list[1].(string); ok { + fieldName = trimColon(name) + bad = len(fieldType) == 0 || len(fieldName) == 0 + } + case 1: + str, ok := list[0].(string) + if ok && trimColon(str) == "skip" { + continue + } + } + if bad { + return fmt.Errorf("bad field ID %d: should have two items (type, name) or one (:skip) (Got %+v)", fieldID, list) + } + key := fields.Key{ + EnterpriseID: uint32(pem), + FieldID: uint16(fieldID), + } + if _, exists := dest[key]; exists { + return fmt.Errorf("repeated field ID %d", fieldID) + } + decoder, found := logstashName2Decoder[fieldType] + if !found { + return fmt.Errorf("field ID %d has unknown type %s", fieldID, fieldType) + } + dest[key] = &fields.Field{ + Name: fieldName, + Decoder: decoder, + } + } + return nil +} + +func fieldsAreIPFIX(tree map[interface{}]interface{}) (bool, error) { + if len(tree) == 0 { + return false, errors.New("custom fields definition is empty") + } + var seenList, seenMap bool + for key, value := range tree { + var msg string + switch v := value.(type) { + case map[interface{}]interface{}: + seenMap = true + if seenList { + msg = "expected IPFIX map of fields" + } + case []interface{}: + seenList = true + if seenMap { + msg = "expected NetFlow single field definition" + } + default: + msg = fmt.Sprintf("unexpected format, got %T", v) + } + if len(msg) > 0 { + return false, fmt.Errorf("inconsistent custom fields definition format: %s at key %v", + msg, key) + } + } + return seenMap, nil +} diff --git a/x-pack/filebeat/input/netflow/definitions_test.go b/x-pack/filebeat/input/netflow/definitions_test.go new file mode 100644 index 000000000000..e1f219b05666 --- /dev/null +++ b/x-pack/filebeat/input/netflow/definitions_test.go @@ -0,0 +1,110 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package netflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" + + "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields" +) + +func TestLoadFieldDefinitions(t *testing.T) { + for _, testCase := range []struct { + title, yaml string + expected fields.FieldDict + }{ + { + title: "IPFIX definitions", + yaml: ` +1234: + 0: + - :skip + 7: + - 4 + - :rawField + 11: + - :ip4_addr + - :ip4_field + 33: + - :ip6_addr + - :ipv6_field + 42: + - :int15 + - :dword_field +0x29a: + 128: + - :mac_addr + - :mac_field + 999: + - :string + - :name +`, + expected: fields.FieldDict{ + fields.Key{EnterpriseID: 1234, FieldID: 7}: &fields.Field{Name: "rawField", Decoder: fields.Unsigned32}, + fields.Key{EnterpriseID: 1234, FieldID: 11}: &fields.Field{Name: "ip4_field", Decoder: fields.Ipv4Address}, + fields.Key{EnterpriseID: 1234, FieldID: 33}: &fields.Field{Name: "ipv6_field", Decoder: fields.Ipv6Address}, + fields.Key{EnterpriseID: 1234, FieldID: 42}: &fields.Field{Name: "dword_field", Decoder: fields.SignedDecoder(15)}, + fields.Key{EnterpriseID: 666, FieldID: 128}: &fields.Field{Name: "mac_field", Decoder: fields.MacAddress}, + fields.Key{EnterpriseID: 666, FieldID: 999}: &fields.Field{Name: "name", Decoder: fields.String}, + }, + }, + { + title: "NetFlow definitions", + yaml: ` +1: + - :double + - MyDouble +2: + - :float + - :SomeFloat +3: + - skip +4: + - mac_addr + - :peerMac +5: + - 3 + - :rgbColor +6: + - :octet_array + - :bunchBytes +7: + - :skip +8: + - :forwarding_status + - :status +`, + expected: fields.FieldDict{ + fields.Key{EnterpriseID: 0, FieldID: 1}: &fields.Field{Name: "MyDouble", Decoder: fields.Float64}, + fields.Key{EnterpriseID: 0, FieldID: 2}: &fields.Field{Name: "SomeFloat", Decoder: fields.Float32}, + fields.Key{EnterpriseID: 0, FieldID: 4}: &fields.Field{Name: "peerMac", Decoder: fields.MacAddress}, + fields.Key{EnterpriseID: 0, FieldID: 5}: &fields.Field{Name: "rgbColor", Decoder: fields.UnsignedDecoder(24)}, + fields.Key{EnterpriseID: 0, FieldID: 6}: &fields.Field{Name: "bunchBytes", Decoder: fields.OctetArray}, + fields.Key{EnterpriseID: 0, FieldID: 8}: &fields.Field{Name: "status", Decoder: fields.UnsupportedDecoder{}}, + }, + }, + } { + t.Run(testCase.title, func(t *testing.T) { + var tree interface{} + if err := yaml.Unmarshal([]byte(testCase.yaml), &tree); err != nil { + t.Fatal(err) + } + defs, err := LoadFieldDefinitions(tree) + if !assert.NoError(t, err) { + t.Fatal(err) + } + if !assert.Len(t, defs, len(testCase.expected)) { + t.FailNow() + } + for key, value := range testCase.expected { + assert.Contains(t, defs, key) + assert.Equal(t, *value, *defs[key]) + } + }) + } +} diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index f86d0525ce00..f9483ea1bd1e 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -12,8 +12,6 @@ import ( "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common/atomic" - "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" @@ -22,9 +20,11 @@ import ( "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/monitoring" "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder" + "github.com/elastic/beats/x-pack/filebeat/input/netflow/decoder/fields" ) const ( @@ -84,9 +84,18 @@ func NewInput( return nil, err } + var customFields []fields.FieldDict + for _, yamlPath := range config.CustomDefinitions { + f, err := LoadFieldDefinitionsFromFile(yamlPath) + if err != nil { + return nil, errors.Wrapf(err, "failed parsing custom field definitions from file '%s'", yamlPath) + } + customFields = append(customFields, f) + } decoder, err := decoder.NewDecoder(decoder.NewConfig(). WithProtocols(config.Protocols...). - WithExpiration(config.ExpirationTimeout)) + WithExpiration(config.ExpirationTimeout). + WithCustomFields(customFields...)) if err != nil { return nil, errors.Wrapf(err, "error initializing netflow decoder") } diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index bf30946d8db0..ce68e34ba525 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -38,12 +38,18 @@ const ( pcapDir = "testdata/pcap" datDir = "testdata/dat" goldenDir = "testdata/golden" + fieldsDir = "testdata/fields" datSourceIP = "192.0.2.1" ) // DatTests specifies the .dat files associated with test cases. type DatTests struct { - Tests map[string][]string `yaml:"tests"` + Tests map[string]TestCase `yaml:"tests"` +} + +type TestCase struct { + Files []string `yaml:"files"` + Fields []string `yaml:"custom_fields"` } // TestResult specifies the format of the result data that is written in a @@ -94,10 +100,10 @@ func TestPCAPFiles(t *testing.T) { func TestDatFiles(t *testing.T) { tests := readDatTests(t) - for name, files := range tests.Tests { + for name, testData := range tests.Tests { t.Run(name, func(t *testing.T) { goldenName := filepath.Join(goldenDir, sanitizer.Replace(name)+".golden.json") - result := getFlowsFromDat(t, name, files...) + result := getFlowsFromDat(t, name, testData) if *update { data, err := json.MarshalIndent(result, "", " ") @@ -148,7 +154,7 @@ func readDatTests(t testing.TB) *DatTests { return &tests } -func getFlowsFromDat(t testing.TB, name string, datFiles ...string) TestResult { +func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult { t.Helper() config := decoder.NewConfig(). @@ -157,6 +163,14 @@ func getFlowsFromDat(t testing.TB, name string, datFiles ...string) TestResult { WithExpiration(0). WithLogOutput(test.TestLogWriter{TB: t}) + for _, fieldFile := range testCase.Fields { + fields, err := LoadFieldDefinitionsFromFile(filepath.Join(fieldsDir, fieldFile)) + if err != nil { + t.Fatal(err, fieldFile) + } + config = config.WithCustomFields(fields) + } + decoder, err := decoder.NewDecoder(config) if !assert.NoError(t, err) { t.Fatal(err) @@ -164,7 +178,7 @@ func getFlowsFromDat(t testing.TB, name string, datFiles ...string) TestResult { source := test.MakeAddress(t, datSourceIP+":4444") var events []beat.Event - for _, f := range datFiles { + for _, f := range testCase.Files { dat, err := ioutil.ReadFile(filepath.Join(datDir, f)) if err != nil { t.Fatal(err) diff --git a/x-pack/filebeat/input/netflow/testdata/dat_tests.yaml b/x-pack/filebeat/input/netflow/testdata/dat_tests.yaml index e04f392d7dc7..cd4b40482613 100644 --- a/x-pack/filebeat/input/netflow/testdata/dat_tests.yaml +++ b/x-pack/filebeat/input/netflow/testdata/dat_tests.yaml @@ -2,132 +2,180 @@ tests: IPFIX vIPtela with VPN id: - - ipfix_test_viptela_tpl257.dat - - ipfix_test_viptela_data257.dat + files: + - ipfix_test_viptela_tpl257.dat + - ipfix_test_viptela_data257.dat IPFIX Barracuda firewall: - - ipfix_test_barracuda_tpl.dat - - ipfix_test_barracuda_data256.dat + files: + - ipfix_test_barracuda_tpl.dat + - ipfix_test_barracuda_data256.dat IPFIX YAF basic with applabel: - - ipfix_test_yaf_tpls_option_tpl.dat - - ipfix_test_yaf_tpl45841.dat - - ipfix_test_yaf_data45841.dat - - ipfix_test_yaf_data45873.dat - - ipfix_test_yaf_data53248.dat + files: + - ipfix_test_yaf_tpls_option_tpl.dat + - ipfix_test_yaf_tpl45841.dat + - ipfix_test_yaf_data45841.dat + - ipfix_test_yaf_data45873.dat + - ipfix_test_yaf_data53248.dat IPFIX Netscaler with variable length fields, missing templates: - - ipfix_test_netscaler_data.dat + files: + - ipfix_test_netscaler_data.dat IPFIX configured with include_flowset_id: - - ipfix_test_netscaler_tpl.dat - - ipfix_test_netscaler_data.dat + files: + - ipfix_test_netscaler_tpl.dat + - ipfix_test_netscaler_data.dat IPFIX: - - ipfix.dat + files: + - ipfix.dat IPFIX OpenBSD pflow: - - ipfix_test_openbsd_pflow_tpl.dat - - ipfix_test_openbsd_pflow_data.dat + files: + - ipfix_test_openbsd_pflow_tpl.dat + - ipfix_test_openbsd_pflow_data.dat IPFIX options template from Juniper MX240 JunOS 15.1 R6 S3: - - ipfix_test_juniper_mx240_junos151r6s3_opttpl512.dat - - ipfix_test_juniper_mx240_junos151r6s3_data512.dat + files: + - ipfix_test_juniper_mx240_junos151r6s3_opttpl512.dat + - ipfix_test_juniper_mx240_junos151r6s3_data512.dat IPFIX Nokia BRAS: - - ipfix_test_nokia_bras_tpl.dat - - ipfix_test_nokia_bras_data256.dat + files: + - ipfix_test_nokia_bras_tpl.dat + - ipfix_test_nokia_bras_data256.dat IPFIX Procera: - - ipfix_test_procera_tpl52935.dat - - ipfix_test_procera_data52935.dat + files: + - ipfix_test_procera_tpl52935.dat + - ipfix_test_procera_data52935.dat IPFIX Barracuda extended uniflow template 256: - - ipfix_test_barracuda_extended_uniflow_tpl256.dat - - ipfix_test_barracuda_extended_uniflow_data256.dat + files: + - ipfix_test_barracuda_extended_uniflow_tpl256.dat + - ipfix_test_barracuda_extended_uniflow_data256.dat IPFIX Mikrotik RouterOS 6.39.2: - - ipfix_test_mikrotik_tpl.dat - - ipfix_test_mikrotik_data258.dat - - ipfix_test_mikrotik_data259.dat + files: + - ipfix_test_mikrotik_tpl.dat + - ipfix_test_mikrotik_data258.dat + - ipfix_test_mikrotik_data259.dat IPFIX Netscaler with variable length fields: - - ipfix_test_netscaler_tpl.dat - - ipfix_test_netscaler_data.dat + files: + - ipfix_test_netscaler_tpl.dat + - ipfix_test_netscaler_data.dat IPFIX VMware virtual distributed switch: - - ipfix_test_vmware_vds_tpl.dat - - ipfix_test_vmware_vds_data264.dat - - ipfix_test_vmware_vds_data266.dat - - ipfix_test_vmware_vds_data266_267.dat + files: + - ipfix_test_vmware_vds_tpl.dat + - ipfix_test_vmware_vds_data264.dat + - ipfix_test_vmware_vds_data266.dat + - ipfix_test_vmware_vds_data266_267.dat Netflow 9 valid 01: - - netflow9_test_valid01.dat + files: + - netflow9_test_valid01.dat Netflow 9 macaddress: - - netflow9_test_macaddr_tpl.dat - - netflow9_test_macaddr_data.dat + files: + - netflow9_test_macaddr_tpl.dat + - netflow9_test_macaddr_data.dat Netflow 9 Cisco ASA: - - netflow9_test_cisco_asa_1_tpl.dat - - netflow9_test_cisco_asa_1_data.dat + files: + - netflow9_test_cisco_asa_1_tpl.dat + - netflow9_test_cisco_asa_1_data.dat + custom_fields: + - netflow9_cisco_asa_custom.yaml Netflow 9 multiple netflow exporters: - - netflow9_test_nprobe_tpl.dat - - netflow9_test_softflowd_tpl_data.dat - - netflow9_test_nprobe_data.dat + files: + - netflow9_test_nprobe_tpl.dat + - netflow9_test_softflowd_tpl_data.dat + - netflow9_test_nprobe_data.dat Netflow 9 invalid 01: - - netflow9_test_invalid01.dat + files: + - netflow9_test_invalid01.dat Netflow 9 options template with scope fields: - - netflow9_test_nprobe_tpl.dat + files: + - netflow9_test_nprobe_tpl.dat Netflow 9 Cisco ASA 2: - - netflow9_test_cisco_asa_2_tpl_26x.dat - - netflow9_test_cisco_asa_2_tpl_27x.dat - - netflow9_test_cisco_asa_2_data.dat + files: + - netflow9_test_cisco_asa_2_tpl_26x.dat + - netflow9_test_cisco_asa_2_tpl_27x.dat + - netflow9_test_cisco_asa_2_data.dat + custom_fields: + - netflow9_cisco_asa_custom.yaml Netflow 9 ipt_netflow reduced size encoding: - - netflow9_test_iptnetflow_reduced_size_encoding_tpldata260.dat + files: + - netflow9_test_iptnetflow_reduced_size_encoding_tpldata260.dat Netflow 9 H3C: - - netflow9_test_h3c_tpl3281.dat - - netflow9_test_h3c_data3281.dat + files: + - netflow9_test_h3c_tpl3281.dat + - netflow9_test_h3c_data3281.dat Netflow 9 IE150 IE151: - - netflow9_test_unknown_tpl266_292_data.dat + files: + - netflow9_test_unknown_tpl266_292_data.dat Netflow 9 Palo Alto 1 flowset in large zero filled packet: - - netflow9_test_paloalto_81_tpl256-263.dat - - netflow9_test_paloalto_81_data257_1flowset_in_large_zerofilled_packet.dat + files: + - netflow9_test_paloalto_81_tpl256-263.dat + - netflow9_test_paloalto_81_data257_1flowset_in_large_zerofilled_packet.dat Netflow 9 H3C Netstream with varstring: - - netflow9_test_h3c_netstream_varstring_tpl3281.dat - - netflow9_test_h3c_netstream_varstring_data3281.dat + files: + - netflow9_test_h3c_netstream_varstring_tpl3281.dat + - netflow9_test_h3c_netstream_varstring_data3281.dat Netflow 9 Fortigate FortiOS 54x appid: - - netflow9_test_fortigate_fortios_542_appid_tpl258-269.dat - - netflow9_test_fortigate_fortios_542_appid_data258_262.dat + files: + - netflow9_test_fortigate_fortios_542_appid_tpl258-269.dat + - netflow9_test_fortigate_fortios_542_appid_data258_262.dat Netflow 9 Ubiquiti Edgerouter with MPLS labels: - - netflow9_test_ubnt_edgerouter_tpl.dat - - netflow9_test_ubnt_edgerouter_data1024.dat - - netflow9_test_ubnt_edgerouter_data1025.dat + files: + - netflow9_test_ubnt_edgerouter_tpl.dat + - netflow9_test_ubnt_edgerouter_data1024.dat + - netflow9_test_ubnt_edgerouter_data1025.dat Netflow 9 nprobe DPI L7: - - netflow9_test_nprobe_dpi.dat + files: + - netflow9_test_nprobe_dpi.dat Netflow 9 Fortigate FortiOS 5.2.1: - - netflow9_test_fortigate_fortios_521_tpl.dat - - netflow9_test_fortigate_fortios_521_data256.dat - - netflow9_test_fortigate_fortios_521_data257.dat + files: + - netflow9_test_fortigate_fortios_521_tpl.dat + - netflow9_test_fortigate_fortios_521_data256.dat + - netflow9_test_fortigate_fortios_521_data257.dat Netflow 9 Streamcore: - - netflow9_test_streamcore_tpl_data256.dat - - netflow9_test_streamcore_tpl_data260.dat + files: + - netflow9_test_streamcore_tpl_data256.dat + - netflow9_test_streamcore_tpl_data260.dat Netflow9 Juniper SRX options template with 0 scope field length: - - netflow9_test_juniper_srx_tplopt.dat + files: + - netflow9_test_juniper_srx_tplopt.dat Netflow 9 template with 0 length fields: - - netflow9_test_0length_fields_tpl_data.dat + files: + - netflow9_test_0length_fields_tpl_data.dat Netflow 9 Cisco ASR 9000 series options template 256: - - netflow9_test_cisco_asr9k_opttpl256.dat - - netflow9_test_cisco_asr9k_data256.dat + files: + - netflow9_test_cisco_asr9k_opttpl256.dat + - netflow9_test_cisco_asr9k_data256.dat Netflow 9 Huawei Netstream: - - netflow9_test_huawei_netstream_tpl.dat - - netflow9_test_huawei_netstream_data.dat + files: + - netflow9_test_huawei_netstream_tpl.dat + - netflow9_test_huawei_netstream_data.dat Netflow 9 field layer2segmentid: - - netflow9_test_field_layer2segmentid_tpl.dat - - netflow9_test_field_layer2segmentid_data.dat + files: + - netflow9_test_field_layer2segmentid_tpl.dat + - netflow9_test_field_layer2segmentid_data.dat Netflow 9 Cisco ASR 9000 series template 260: - - netflow9_test_cisco_asr9k_tpl260.dat - - netflow9_test_cisco_asr9k_data260.dat + files: + - netflow9_test_cisco_asr9k_tpl260.dat + - netflow9_test_cisco_asr9k_data260.dat Netflow 9 Cisco NBAR options template 260: - - netflow9_test_cisco_nbar_opttpl260.dat + files: + - netflow9_test_cisco_nbar_opttpl260.dat Netflow 9 Cisco NBAR flowset 262: - - netflow9_test_cisco_nbar_tpl262.dat - - netflow9_test_cisco_nbar_data262.dat + files: + - netflow9_test_cisco_nbar_tpl262.dat + - netflow9_test_cisco_nbar_data262.dat Netflow 9 Cisco WLC: - - netflow9_test_cisco_wlc_tpl.dat - - netflow9_test_cisco_wlc_data261.dat + files: + - netflow9_test_cisco_wlc_tpl.dat + - netflow9_test_cisco_wlc_data261.dat Netflow 9 Cisco WLC 8500 release 8.2: - - netflow9_test_cisco_wlc_8510_tpl_262.dat + files: + - netflow9_test_cisco_wlc_8510_tpl_262.dat Netflow 9 Cisco 1941/K9 release 15.1: - - netflow9_test_cisco_1941K9.dat + files: + - netflow9_test_cisco_1941K9.dat Netflow 9 Cisco ASR1001-X: - - netflow9_cisco_asr1001x_tpl259.dat + files: + - netflow9_cisco_asr1001x_tpl259.dat Netflow 9 Palo Alto PAN-OS with app-id: - - netflow9_test_paloalto_panos_tpl.dat - - netflow9_test_paloalto_panos_data.dat + files: + - netflow9_test_paloalto_panos_tpl.dat + - netflow9_test_paloalto_panos_data.dat diff --git a/x-pack/filebeat/input/netflow/testdata/fields/netflow9_cisco_asa_custom.yaml b/x-pack/filebeat/input/netflow/testdata/fields/netflow9_cisco_asa_custom.yaml new file mode 100644 index 000000000000..bcf40b96760d --- /dev/null +++ b/x-pack/filebeat/input/netflow/testdata/fields/netflow9_cisco_asa_custom.yaml @@ -0,0 +1,27 @@ +33000: +- :acl_id_asa +- :ingress_acl_id +33001: +- :acl_id_asa +- egress_acl_id +33002: +- :uint16 +- :fw_ext_event +40000: +- :string +- :username +40001: +- :ip4_addr +- :xlate_src_addr_ipv4 +40002: +- :ip4_addr +- :xlate_dst_addr_ipv4 +40003: +- :uint16 +- :xlate_src_port +40004: +- :uint16 +- :xlate_dst_port +40005: +- :uint8 +- :fw_event diff --git a/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA-2.golden.json b/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA-2.golden.json index dfc1898f1e4e..409a4e910b15 100644 --- a/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA-2.golden.json +++ b/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA-2.golden.json @@ -34,6 +34,7 @@ "firewall_event": 2, "flow_id": 742819709, "flow_start_milliseconds": "2016-07-21T13:50:32.955Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -99,6 +100,7 @@ "firewall_event": 5, "flow_id": 742819710, "flow_start_milliseconds": "2016-07-21T13:50:32.955Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -164,6 +166,7 @@ "firewall_event": 2, "flow_id": 742819710, "flow_start_milliseconds": "2016-07-21T13:50:32.955Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -229,6 +232,7 @@ "firewall_event": 5, "flow_id": 742819619, "flow_start_milliseconds": "2016-07-21T13:50:32.475Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -294,6 +298,7 @@ "firewall_event": 2, "flow_id": 742819619, "flow_start_milliseconds": "2016-07-21T13:50:32.475Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -359,6 +364,7 @@ "firewall_event": 5, "flow_id": 742819707, "flow_start_milliseconds": "2016-07-21T13:50:32.955Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -424,6 +430,7 @@ "firewall_event": 2, "flow_id": 742819707, "flow_start_milliseconds": "2016-07-21T13:50:32.955Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -489,6 +496,7 @@ "firewall_event": 1, "flow_id": 742819773, "flow_start_milliseconds": "2016-07-21T13:50:33.385Z", + "fw_ext_event": 0, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -500,7 +508,8 @@ "protocol_identifier": 6, "source_ipv4_address": "192.168.0.1", "source_transport_port": 56649, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "" }, "network": { "community_id": "1:IZ8RrSqt8oeb2F2Rp9296zm54bc=", @@ -552,6 +561,7 @@ "firewall_event": 5, "flow_id": 742819773, "flow_start_milliseconds": "2016-07-21T13:50:33.385Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -617,6 +627,7 @@ "firewall_event": 2, "flow_id": 742819773, "flow_start_milliseconds": "2016-07-21T13:50:33.385Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -682,6 +693,7 @@ "firewall_event": 1, "flow_id": 742820025, "flow_start_milliseconds": "2016-07-21T13:50:35.035Z", + "fw_ext_event": 0, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -693,7 +705,8 @@ "protocol_identifier": 6, "source_ipv4_address": "192.168.0.2", "source_transport_port": 61777, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "" }, "network": { "community_id": "1:E1vNamQGw5X+X+vT1g7ui6Nc3O0=", @@ -745,6 +758,7 @@ "firewall_event": 5, "flow_id": 742820025, "flow_start_milliseconds": "2016-07-21T13:50:35.035Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -810,6 +824,7 @@ "firewall_event": 2, "flow_id": 742820025, "flow_start_milliseconds": "2016-07-21T13:50:35.035Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -875,6 +890,7 @@ "firewall_event": 1, "flow_id": 742820153, "flow_start_milliseconds": "2016-07-21T13:50:35.785Z", + "fw_ext_event": 0, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -886,7 +902,8 @@ "protocol_identifier": 6, "source_ipv4_address": "192.168.0.1", "source_transport_port": 56650, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "" }, "network": { "community_id": "1:pkwcoe/zjCLerUgj+HGAwwt4wV8=", @@ -938,6 +955,7 @@ "firewall_event": 5, "flow_id": 742820153, "flow_start_milliseconds": "2016-07-21T13:50:35.785Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -1003,6 +1021,7 @@ "firewall_event": 2, "flow_id": 742820153, "flow_start_milliseconds": "2016-07-21T13:50:35.785Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -1068,6 +1087,7 @@ "firewall_event": 1, "flow_id": 742820223, "flow_start_milliseconds": "2016-07-21T13:50:36.395Z", + "fw_ext_event": 0, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -1079,7 +1099,8 @@ "protocol_identifier": 6, "source_ipv4_address": "192.168.0.1", "source_transport_port": 56651, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "" }, "network": { "community_id": "1:35/w0D/WO1QvBp8O+Vd95Nb+tt4=", @@ -1131,6 +1152,7 @@ "firewall_event": 5, "flow_id": 742820223, "flow_start_milliseconds": "2016-07-21T13:50:36.395Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -1196,6 +1218,7 @@ "firewall_event": 2, "flow_id": 742820223, "flow_start_milliseconds": "2016-07-21T13:50:36.395Z", + "fw_ext_event": 2030, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, diff --git a/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA.golden.json b/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA.golden.json index 88dd710d2112..cf2913314459 100644 --- a/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA.golden.json +++ b/x-pack/filebeat/input/netflow/testdata/golden/Netflow-9-Cisco-ASA.golden.json @@ -33,6 +33,8 @@ }, "flow_id": 8500, "flow_start_milliseconds": "2015-10-09T09:47:47.569Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -41,7 +43,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.14.1", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "2.2.2.11", + "xlate_dst_port": 17549, + "xlate_src_addr_ipv4": "192.168.14.1", + "xlate_src_port": 0 }, "network": { "bytes": 56, @@ -93,6 +100,8 @@ }, "flow_id": 8501, "flow_start_milliseconds": "2015-10-09T09:47:48.169Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 8, "ingress_interface": 2, @@ -101,7 +110,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.23.22", "source_transport_port": 17549, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "164.164.37.11", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "192.168.23.22", + "xlate_src_port": 17549 }, "network": { "bytes": 56, @@ -153,6 +167,8 @@ }, "flow_id": 8502, "flow_start_milliseconds": "2015-10-09T09:47:48.179Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -161,7 +177,12 @@ "protocol_identifier": 1, "source_ipv4_address": "164.164.37.11", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.23.22", + "xlate_dst_port": 17549, + "xlate_src_addr_ipv4": "164.164.37.11", + "xlate_src_port": 0 }, "network": { "bytes": 56, @@ -213,6 +234,8 @@ }, "flow_id": 8503, "flow_start_milliseconds": "2015-10-09T09:47:48.399Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 8, "ingress_interface": 2, @@ -221,7 +244,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.23.20", "source_transport_port": 17805, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "164.164.37.11", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "192.168.23.20", + "xlate_src_port": 17805 }, "network": { "bytes": 56, @@ -273,6 +301,8 @@ }, "flow_id": 8504, "flow_start_milliseconds": "2015-10-09T09:47:48.409Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -281,7 +311,12 @@ "protocol_identifier": 1, "source_ipv4_address": "164.164.37.11", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.23.20", + "xlate_dst_port": 17805, + "xlate_src_addr_ipv4": "164.164.37.11", + "xlate_src_port": 0 }, "network": { "bytes": 56, @@ -333,6 +368,8 @@ }, "flow_id": 8505, "flow_start_milliseconds": "2015-10-09T09:47:48.589Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 8, "ingress_interface": 3, @@ -341,7 +378,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.14.11", "source_transport_port": 17805, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "2.2.2.11", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "192.168.14.11", + "xlate_src_port": 17805 }, "network": { "bytes": 56, @@ -393,6 +435,8 @@ }, "flow_id": 8506, "flow_start_milliseconds": "2015-10-09T09:47:48.599Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 2, @@ -401,7 +445,12 @@ "protocol_identifier": 1, "source_ipv4_address": "2.2.2.11", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.14.11", + "xlate_dst_port": 17805, + "xlate_src_addr_ipv4": "2.2.2.11", + "xlate_src_port": 0 }, "network": { "bytes": 56, @@ -453,6 +502,8 @@ }, "flow_id": 8507, "flow_start_milliseconds": "2015-10-09T09:47:48.609Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 8, "ingress_interface": 2, @@ -461,7 +512,12 @@ "protocol_identifier": 1, "source_ipv4_address": "2.2.2.11", "source_transport_port": 17805, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.14.1", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "2.2.2.11", + "xlate_src_port": 17805 }, "network": { "bytes": 56, @@ -513,6 +569,8 @@ }, "flow_id": 8508, "flow_start_milliseconds": "2015-10-09T09:47:48.619Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -521,7 +579,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.14.1", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "2.2.2.11", + "xlate_dst_port": 17805, + "xlate_src_addr_ipv4": "192.168.14.1", + "xlate_src_port": 0 }, "network": { "bytes": 56, @@ -573,6 +636,8 @@ }, "flow_id": 8525, "flow_start_milliseconds": "2015-10-09T09:47:51.269Z", + "fw_event": 2, + "fw_ext_event": 2016, "icmp_code_ipv4": 3, "icmp_type_ipv4": 3, "ingress_interface": 3, @@ -581,7 +646,12 @@ "protocol_identifier": 1, "source_ipv4_address": "164.164.37.11", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.23.1", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "164.164.37.11", + "xlate_src_port": 0 }, "network": { "bytes": 160, @@ -633,6 +703,8 @@ }, "flow_id": 8509, "flow_start_milliseconds": "2015-10-09T09:47:49.249Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 8, "ingress_interface": 2, @@ -641,7 +713,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.23.22", "source_transport_port": 18061, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "164.164.37.11", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "192.168.23.22", + "xlate_src_port": 18061 }, "network": { "bytes": 56, @@ -693,6 +770,8 @@ }, "flow_id": 8510, "flow_start_milliseconds": "2015-10-09T09:47:49.259Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -701,7 +780,12 @@ "protocol_identifier": 1, "source_ipv4_address": "164.164.37.11", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.23.22", + "xlate_dst_port": 18061, + "xlate_src_addr_ipv4": "164.164.37.11", + "xlate_src_port": 0 }, "network": { "bytes": 56, @@ -753,6 +837,8 @@ }, "flow_id": 8511, "flow_start_milliseconds": "2015-10-09T09:47:49.469Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 8, "ingress_interface": 2, @@ -761,7 +847,12 @@ "protocol_identifier": 1, "source_ipv4_address": "192.168.23.20", "source_transport_port": 18061, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "164.164.37.11", + "xlate_dst_port": 0, + "xlate_src_addr_ipv4": "192.168.23.20", + "xlate_src_port": 18061 }, "network": { "bytes": 56, @@ -813,6 +904,8 @@ }, "flow_id": 8512, "flow_start_milliseconds": "2015-10-09T09:47:49.479Z", + "fw_event": 2, + "fw_ext_event": 2025, "icmp_code_ipv4": 0, "icmp_type_ipv4": 0, "ingress_interface": 3, @@ -821,7 +914,12 @@ "protocol_identifier": 1, "source_ipv4_address": "164.164.37.11", "source_transport_port": 0, - "type": "netflow_flow" + "type": "netflow_flow", + "username": "", + "xlate_dst_addr_ipv4": "192.168.23.20", + "xlate_dst_port": 18061, + "xlate_src_addr_ipv4": "164.164.37.11", + "xlate_src_port": 0 }, "network": { "bytes": 56, From 158bdec1f270f64dbcf5ed22f2fb7c635f5ab39d Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Wed, 13 Mar 2019 18:42:30 +0100 Subject: [PATCH 2/2] Update CHANGELOG --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a3d5b1ad3bbd..2b166f07aa68 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -105,6 +105,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di *Filebeat* - Add ISO8601 timestamp support in syslog metricset. {issue}8716[8716] {pull}10736[10736] +- Add support for loading custom NetFlow and IPFIX field definitions to netflow input. {pull}10945[10945] {pull}11223[11223] *Heartbeat*