diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 64a425b99a24..06622b8452ea 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Add support of log_format combined to NGINX access logs. {pull}6858[6858] - Release config reloading feature as GA. - Add support human friendly size for the UDP input. {pull}6886[6886] +- Add Syslog input to ingest RFC3164 Events via TCP and UDP {pull}6842[6842] *Heartbeat* diff --git a/filebeat/_meta/common.reference.p2.yml b/filebeat/_meta/common.reference.p2.yml index 74dce6f47221..1ab1c9c3f38e 100644 --- a/filebeat/_meta/common.reference.p2.yml +++ b/filebeat/_meta/common.reference.p2.yml @@ -241,7 +241,7 @@ filebeat.inputs: # Maximum size of the message received over UDP #max_message_size: 10KiB -#------------------------------ TCP prospector -------------------------------- +#------------------------------ TCP input -------------------------------- # Experimental: Config options for the TCP input #- type: tcp #enabled: false @@ -258,6 +258,35 @@ filebeat.inputs: # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s +#------------------------------ Syslog input -------------------------------- +# Experimental: Config options for the Syslog input +# Accept RFC3164 formatted syslog event via UDP. +#- type: syslog + #enabled: false + #protocol.udp: + # The host and port to receive the new event + #host: "localhost:9000" + + # Maximum size of the message received over UDP + #max_message_size: 10KiB + +# Accept RFC3164 formatted syslog event via TCP. +#- type: syslog + #enabled: false + + #protocol.tcp: + # The host and port to receive the new event + #host: "localhost:9000" + + # Character used to split new message + #line_delimiter: "\n" + + # Maximum size in bytes of the message received over TCP + #max_message_size: 20MiB + + # The number of seconds of inactivity before a remote connection is closed. + #timeout: 300s + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/_meta/fields.common.yml b/filebeat/_meta/fields.common.yml index 94d87dd7c787..1a0202655033 100644 --- a/filebeat/_meta/fields.common.yml +++ b/filebeat/_meta/fields.common.yml @@ -54,3 +54,45 @@ - name: fileset.name description: > The Filebeat fileset that generated this event. + + - name: syslog.facility + type: long + required: false + description: > + The facility extracted from the priority. + + - name: syslog.priority + type: long + required: false + description: > + The priority of the syslog event. + + - name: syslog.severity_label + type: keyword + required: false + description: > + The human readable severity. + + - name: syslog.facility_label + type: keyword + required: false + description: > + The human readable facility. + + - name: process.program + type: keyword + required: false + description: > + The name of the program. + + - name: process.pid + type: long + required: false + description: > + The pid of the process. + + - name: event.severity + type: long + required: false + description: > + The severity of the event. diff --git a/filebeat/docs/fields.asciidoc b/filebeat/docs/fields.asciidoc index 323759c6c054..bd23ef709ef8 100644 --- a/filebeat/docs/fields.asciidoc +++ b/filebeat/docs/fields.asciidoc @@ -1806,6 +1806,90 @@ The Filebeat module that generated this event. The Filebeat fileset that generated this event. +-- + +*`syslog.facility`*:: ++ +-- +type: long + +required: False + +The facility extracted from the priority. + + +-- + +*`syslog.priority`*:: ++ +-- +type: long + +required: False + +The priority of the syslog event. + + +-- + +*`syslog.severity_label`*:: ++ +-- +type: keyword + +required: False + +The human readable severity. + + +-- + +*`syslog.facility_label`*:: ++ +-- +type: keyword + +required: False + +The human readable facility. + + +-- + +*`process.program`*:: ++ +-- +type: keyword + +required: False + +The name of the program. + + +-- + +*`process.pid`*:: ++ +-- +type: long + +required: False + +The pid of the process. + + +-- + +*`event.severity`*:: ++ +-- +type: long + +required: False + +The severity of the event. + + -- [[exported-fields-logstash]] diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index ba2109e7e33b..c2fcceae33c8 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -48,6 +48,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-udp>> * <<{beatname_lc}-input-docker>> * <<{beatname_lc}-input-tcp>> +* <<{beatname_lc}-input-syslog>> @@ -62,3 +63,5 @@ include::inputs/input-udp.asciidoc[] include::inputs/input-docker.asciidoc[] include::inputs/input-tcp.asciidoc[] + +include::inputs/input-syslog.asciidoc[] diff --git a/filebeat/docs/inputs/input-common-tcp-options.asciidoc b/filebeat/docs/inputs/input-common-tcp-options.asciidoc new file mode 100644 index 000000000000..c82dd7ee4f0f --- /dev/null +++ b/filebeat/docs/inputs/input-common-tcp-options.asciidoc @@ -0,0 +1,29 @@ +////////////////////////////////////////////////////////////////////////// +//// This content is shared by Filebeat inputs that use the TCP inputsource +//// If you add IDs to sections, make sure you use attributes to create +//// unique IDs for each input that includes this file. Use the format: +//// [id="{beatname_lc}-input-{type}-option-name"] +////////////////////////////////////////////////////////////////////////// +[float] +[id="{beatname_lc}-input-{type}-tcp-max-message-size"] +==== `max_message_size` + +The maximum size of the message received over TCP. The default is `20MiB`. + +[float] +[id="{beatname_lc}-input-{type}-tcp-host"] +==== `host` + +The host and TCP port to listen on for event streams. + +[float] +[id="{beatname_lc}-input-{type}-tcp-line-delimiter"] +==== `line_delimiter` + +Specify the characters used to split the incoming events. The default is '\n'. + +[float] +[id="{beatname_lc}-input-{type}-tcp-timeout"] +==== `timeout` + +The number of seconds of inactivity before a remote connection is closed. The default is `300s`. diff --git a/filebeat/docs/inputs/input-common-udp-options.asciidoc b/filebeat/docs/inputs/input-common-udp-options.asciidoc new file mode 100644 index 000000000000..27068d5346d3 --- /dev/null +++ b/filebeat/docs/inputs/input-common-udp-options.asciidoc @@ -0,0 +1,17 @@ +////////////////////////////////////////////////////////////////////////// +//// This content is shared by Filebeat inputs that use the UDP inputsource +//// If you add IDs to sections, make sure you use attributes to create +//// unique IDs for each input that includes this file. Use the format: +//// [id="{beatname_lc}-input-{type}-option-name"] +////////////////////////////////////////////////////////////////////////// +[float] +[id="{beatname_lc}-input-{type}-udp-max-message-size"] +==== `max_message_size` + +The maximum size of the message received over UDP. The default is `10KiB`. + +[float] +[id="{beatname_lc}-input-{type}-udp-host"] +==== `host` + +The host and UDP port to listen on for event streams. diff --git a/filebeat/docs/inputs/input-syslog.asciidoc b/filebeat/docs/inputs/input-syslog.asciidoc new file mode 100644 index 000000000000..36a45bbd722d --- /dev/null +++ b/filebeat/docs/inputs/input-syslog.asciidoc @@ -0,0 +1,47 @@ +:type: syslog + +[id="{beatname_lc}-input-{type}"] +=== Syslog input + +++++ +Syslog +++++ + +Use the `syslog` input to read events over TCP or UDP, this input will parse BSD (rfc3164) +event and some variant. + +Example configurations: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: syslog + protocol.udp: + host: "localhost:9000" +---- + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: syslog + protocol.tcp: + host: "localhost:9000" +---- + +==== Configuration options + +The `syslog` input supports protocol specific configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +Protocol `udp`: + +include::../inputs/input-common-udp-options.asciidoc[] + +Protocol `tcp`: + +include::../inputs/input-common-tcp-options.asciidoc[] + +[id="{beatname_lc}-input-{type}-common-options"] +include::../inputs/input-common-options.asciidoc[] + +:type!: diff --git a/filebeat/docs/inputs/input-tcp.asciidoc b/filebeat/docs/inputs/input-tcp.asciidoc index 685bee40661d..b373a36d96c5 100644 --- a/filebeat/docs/inputs/input-tcp.asciidoc +++ b/filebeat/docs/inputs/input-tcp.asciidoc @@ -15,7 +15,7 @@ Example configuration: ---- {beatname_lc}.inputs: - type: tcp - max_message_size: 10240 + max_message_size: 10MiB host: "localhost:9000" ---- @@ -25,29 +25,7 @@ Example configuration: The `tcp` input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. -[float] -[id="{beatname_lc}-input-{type}-max-message-size"] -==== `max_message_size` - -The maximum size of the message received over TCP. The default is `20MiB`. - -[float] -[id="{beatname_lc}-input-{type}-host"] -==== `host` - -The host and TCP port to listen on for event streams. - -[float] -[id="{beatname_lc}-input-{type}-line-delimiter"] -==== `line_delimiter` - -Specify the characters used to split the incoming events. The default is '\n'. - -[float] -[id="{beatname_lc}-input-{type}-timeout"] -==== `timeout` - -The number of seconds of inactivity before a remote connection is closed. The default is `300s`. +include::../inputs/input-common-tcp-options.asciidoc[] [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/docs/inputs/input-udp.asciidoc b/filebeat/docs/inputs/input-udp.asciidoc index 4b61df44c2cd..f6c63c828fed 100644 --- a/filebeat/docs/inputs/input-udp.asciidoc +++ b/filebeat/docs/inputs/input-udp.asciidoc @@ -25,17 +25,7 @@ Example configuration: The `udp` input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. -[float] -[id="{beatname_lc}-input-{type}-max-message-size"] -==== `max_message_size` - -The maximum size of the message received over UDP. The default is `10KiB`. - -[float] -[id="{beatname_lc}-input-{type}-host"] -==== `host` - -The host and UDP port to listen on for event streams. +include::../inputs/input-common-udp-options.asciidoc[] [id="{beatname_lc}-input-{type}-common-options"] include::../inputs/input-common-options.asciidoc[] diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 0beb673f5873..a991bcde087c 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -550,7 +550,7 @@ filebeat.inputs: # Maximum size of the message received over UDP #max_message_size: 10KiB -#------------------------------ TCP prospector -------------------------------- +#------------------------------ TCP input -------------------------------- # Experimental: Config options for the TCP input #- type: tcp #enabled: false @@ -567,6 +567,35 @@ filebeat.inputs: # The number of seconds of inactivity before a remote connection is closed. #timeout: 300s +#------------------------------ Syslog input -------------------------------- +# Experimental: Config options for the Syslog input +# Accept RFC3164 formatted syslog event via UDP. +#- type: syslog + #enabled: false + #protocol.udp: + # The host and port to receive the new event + #host: "localhost:9000" + + # Maximum size of the message received over UDP + #max_message_size: 10KiB + +# Accept RFC3164 formatted syslog event via TCP. +#- type: syslog + #enabled: false + + #protocol.tcp: + # The host and port to receive the new event + #host: "localhost:9000" + + # Character used to split new message + #line_delimiter: "\n" + + # Maximum size in bytes of the message received over TCP + #max_message_size: 20MiB + + # The number of seconds of inactivity before a remote connection is closed. + #timeout: 300s + #========================== Filebeat autodiscover ============================== # Autodiscover allows you to detect changes in the system and spawn new modules diff --git a/filebeat/include/list.go b/filebeat/include/list.go index 374be396eced..96e268503354 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -12,6 +12,7 @@ import ( _ "github.com/elastic/beats/filebeat/input/log" _ "github.com/elastic/beats/filebeat/input/redis" _ "github.com/elastic/beats/filebeat/input/stdin" + _ "github.com/elastic/beats/filebeat/input/syslog" _ "github.com/elastic/beats/filebeat/input/tcp" _ "github.com/elastic/beats/filebeat/input/udp" ) diff --git a/filebeat/input/syslog/config.go b/filebeat/input/syslog/config.go new file mode 100644 index 000000000000..1a3b12a12d41 --- /dev/null +++ b/filebeat/input/syslog/config.go @@ -0,0 +1,60 @@ +package syslog + +import ( + "fmt" + "time" + + "github.com/dustin/go-humanize" + + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/inputsource" + "github.com/elastic/beats/filebeat/inputsource/tcp" + "github.com/elastic/beats/filebeat/inputsource/udp" + "github.com/elastic/beats/libbeat/common" +) + +type config struct { + harvester.ForwarderConfig `config:",inline"` + Protocol common.ConfigNamespace `config:"protocol"` +} + +var defaultConfig = config{ + ForwarderConfig: harvester.ForwarderConfig{ + Type: "syslog", + }, +} + +var defaultTCP = tcp.Config{ + LineDelimiter: "\n", + Timeout: time.Minute * 5, + MaxMessageSize: 20 * humanize.MiByte, +} + +var defaultUDP = udp.Config{ + MaxMessageSize: 10 * humanize.KiByte, + Timeout: time.Minute * 5, +} + +func factory( + cb inputsource.NetworkFunc, + config common.ConfigNamespace, +) (inputsource.Network, error) { + n, cfg := config.Name(), config.Config() + + switch n { + case tcp.Name: + config := defaultTCP + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + return tcp.New(&config, cb) + case udp.Name: + config := defaultUDP + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + return udp.New(&config, cb), nil + default: + return nil, fmt.Errorf("you must choose between TCP or UDP") + } +} diff --git a/filebeat/input/syslog/event.go b/filebeat/input/syslog/event.go new file mode 100644 index 000000000000..dd5b31ac6224 --- /dev/null +++ b/filebeat/input/syslog/event.go @@ -0,0 +1,239 @@ +package syslog + +import ( + "time" +) + +const severityMask = 7 +const facilityShift = 3 + +var month = map[string]time.Month{ + "Jan": time.January, + "Feb": time.February, + "Mar": time.March, + "Apr": time.April, + "May": time.May, + "Jun": time.June, + "Jul": time.July, + "Aug": time.August, + "Sep": time.September, + "Oct": time.October, + "Nov": time.November, + "Dec": time.December, +} + +// event is a parsed syslog event, validation of the format is done at the parser level. +type event struct { + message string + hostname string //x + priority int + program string //x + pid int + month time.Month + day int + hour int + minute int + second int + nanosecond int + loc *time.Location +} + +// newEvent() return a new event. +func newEvent() *event { + return &event{ + priority: -1, + pid: -1, + month: -1, + day: -1, + hour: -1, + minute: -1, + second: -1, + } +} + +// SetMonth sets the month. +func (s *event) SetMonth(b []byte) { + var k string + if len(b) > 3 { + k = string(b[0:3]) + } else { + k = string(b) + } + v, ok := month[k] + if ok { + s.month = v + } +} + +// Month returns the month. +func (s *event) Month() time.Month { + return s.month +} + +// SetDay sets the day as. +func (s *event) SetDay(b []byte) { + s.day = bytesToInt(skipLeadZero(b)) +} + +// Day returns the day. +func (s *event) Day() int { + return s.day +} + +// SetHour sets the hour. +func (s *event) SetHour(b []byte) { + s.hour = bytesToInt(skipLeadZero(b)) +} + +// Hour returns the hour. +func (s *event) Hour() int { + return s.hour +} + +// SetMinute sets the minute. +func (s *event) SetMinute(b []byte) { + s.minute = bytesToInt(skipLeadZero(b)) +} + +// Minute return the minutes. +func (s *event) Minute() int { + return s.minute +} + +// SetSecond sets the second. +func (s *event) SetSecond(b []byte) { + s.second = bytesToInt(skipLeadZero(b)) +} + +// Second returns the second. +func (s *event) Second() int { + return s.second +} + +// Year returns the current year, since syslog events don't include that. +func (s *event) Year() int { + return time.Now().Year() +} + +// SetMessage sets the message. +func (s *event) SetMessage(b []byte) { + s.message = string(b) +} + +// Message returns the message. +func (s *event) Message() string { + return s.message +} + +// SetPriority sets the priority. +func (s *event) SetPriority(priority []byte) { + s.priority = bytesToInt(priority) +} + +// Priority returns the priority. +func (s *event) Priority() int { + return s.priority +} + +// HasPriority returns if the priority was in original event. +func (s *event) HasPriority() bool { + return s.priority > 0 +} + +// Severity returns the severity, will return -1 if priority is not set. +func (s *event) Severity() int { + if !s.HasPriority() { + return -1 + } + return s.Priority() & severityMask +} + +// Facility returns the facility, will return -1 if priority is not set. +func (s *event) Facility() int { + if !s.HasPriority() { + return -1 + } + return s.Priority() >> facilityShift +} + +// SetHostname sets the hostname. +func (s *event) SetHostname(b []byte) { + s.hostname = string(b) +} + +// Hostname returns the hostname. +func (s *event) Hostname() string { + return string(s.hostname) +} + +// SetProgram sets the programs as a byte slice. +func (s *event) SetProgram(b []byte) { + s.program = string(b) +} + +// Program returns the program name. +func (s *event) Program() string { + return s.program +} + +func (s *event) SetPid(b []byte) { + s.pid = bytesToInt(b) +} + +// Pid returns the pid. +func (s *event) Pid() int { + return s.pid +} + +// HasPid returns true if a pid is set. +func (s *event) HasPid() bool { + return s.pid > 0 +} + +// SetNanoSecond sets the nanosecond. +func (s *event) SetNanosecond(b []byte) { + s.nanosecond = bytesToInt(skipLeadZero(b)) +} + +// NanoSecond returns the nanosecond. +func (s *event) Nanosecond() int { + return s.nanosecond +} + +// Timestamp return the timestamp in UTC. +func (s *event) Timestamp(timezone *time.Location) time.Time { + return time.Date( + s.Year(), + s.Month(), + s.Day(), + s.Hour(), + s.Minute(), + s.Second(), + s.Nanosecond(), + timezone, + ).UTC() +} + +// IsValid returns true if the date and the message are present. +func (s *event) IsValid() bool { + return s.day != -1 && s.hour != -1 && s.minute != -1 && s.second != -1 && s.message != "" +} + +// BytesToInt takes a variable length of bytes and assume ascii chars and convert it to int, this is +// a simplified implementation of strconv.Atoi's fast path without error handling and remove the +// need to convert the byte array to string, we also assume that any errors are taken care at +// the parsing level. +func bytesToInt(b []byte) int { + var i int + for _, x := range b { + i = i*10 + int(x-'0') + } + return i +} + +func skipLeadZero(b []byte) []byte { + if len(b) > 1 && b[0] == '0' { + return b[1:len(b)] + } + return b +} diff --git a/filebeat/input/syslog/event_test.go b/filebeat/input/syslog/event_test.go new file mode 100644 index 000000000000..25a582537505 --- /dev/null +++ b/filebeat/input/syslog/event_test.go @@ -0,0 +1,91 @@ +package syslog + +import ( + "fmt" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSeverity(t *testing.T) { + e := newEvent() + e.SetPriority([]byte("13")) + assert.Equal(t, 5, e.Severity()) +} + +func TestFacility(t *testing.T) { + e := newEvent() + e.SetPriority([]byte("13")) + assert.Equal(t, 1, e.Facility()) +} + +func TestHasPriority(t *testing.T) { + e := newEvent() + e.SetPriority([]byte("13")) + assert.True(t, e.HasPriority()) + assert.Equal(t, 13, e.Priority()) + assert.Equal(t, 5, e.Severity()) + assert.Equal(t, 1, e.Facility()) +} + +func TestNoPrioritySet(t *testing.T) { + e := newEvent() + assert.False(t, e.HasPriority()) + assert.Equal(t, -1, e.Priority()) + assert.Equal(t, -1, e.Severity()) + assert.Equal(t, -1, e.Facility()) +} + +func TestHasPid(t *testing.T) { + e := newEvent() + assert.False(t, e.HasPid()) + e.SetPid([]byte(strconv.Itoa(20))) + assert.True(t, e.HasPid()) +} + +func TestDateParsing(t *testing.T) { + now := time.Now() + e := newEvent() + e.SetDay(itb(now.Day())) + e.SetMonth([]byte(now.Month().String())) + e.SetHour(itb(now.Hour())) + e.SetMinute(itb(now.Minute())) + e.SetSecond(itb(now.Second())) + e.SetNanosecond(itb(now.Nanosecond())) + new := e.Timestamp(time.Local) + assert.Equal(t, now.UTC(), new) +} + +func TestIsValid(t *testing.T) { + e := newEvent() + assert.False(t, e.IsValid()) + + now := time.Now() + + e.SetDay(itb(now.Day())) + assert.False(t, e.IsValid()) + + e.SetMonth([]byte(now.Month().String())) + assert.False(t, e.IsValid()) + + e.SetHour(itb(now.Hour())) + assert.False(t, e.IsValid()) + + e.SetMinute(itb(now.Minute())) + assert.False(t, e.IsValid()) + + e.SetSecond(itb(now.Second())) + assert.False(t, e.IsValid()) + + e.SetMessage([]byte("hello world")) + assert.True(t, e.IsValid()) +} + +func itb(i int) []byte { + if i < 10 { + return []byte(fmt.Sprintf("0%d", i)) + } + return []byte(strconv.Itoa(i)) +} diff --git a/filebeat/input/syslog/input.go b/filebeat/input/syslog/input.go new file mode 100644 index 000000000000..7208c0f2cad2 --- /dev/null +++ b/filebeat/input/syslog/input.go @@ -0,0 +1,225 @@ +package syslog + +import ( + "strings" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/elastic/beats/filebeat/channel" + "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/inputsource" + "github.com/elastic/beats/filebeat/util" + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" + "github.com/elastic/beats/libbeat/logp" +) + +// Parser is generated from a ragel state machine using the following command: +//go:generate ragel -Z -G2 parser.rl -o parser.go + +// Severity and Facility are derived from the priority, theses are the human readable terms +// defined in https://tools.ietf.org/html/rfc3164#section-4.1.1. +// +// Example: +// 2 => "Critial" +type mapper []string + +var ( + severityLabels = mapper{ + "Emergency", + "Alert", + "Critical", + "Error", + "Warning", + "Notice", + "Informational", + "Debug", + } + + facilityLabels = mapper{ + "kernel", + "user-level", + "mail", + "system", + "security/authorization", + "syslogd", + "line printer", + "network news", + "UUCP", + "clock", + "security/authorization", + "FTP", + "NTP", + "log audit", + "log alert", + "clock", + "local0", + "local1", + "local2", + "local3", + "local4", + "local5", + "local6", + "local7", + } +) + +func init() { + err := input.Register("syslog", NewInput) + if err != nil { + panic(err) + } +} + +// Input define a syslog input +type Input struct { + sync.Mutex + started bool + outlet channel.Outleter + server inputsource.Network + config *config + log *logp.Logger +} + +// NewInput creates a new syslog input +func NewInput( + cfg *common.Config, + outlet channel.Factory, + context input.Context, +) (input.Input, error) { + cfgwarn.Experimental("Syslog input type is used") + + log := logp.NewLogger("syslog") + + out, err := outlet(cfg, context.DynamicFields) + if err != nil { + return nil, err + } + + config := defaultConfig + if err = cfg.Unpack(&config); err != nil { + return nil, err + } + + forwarder := harvester.NewForwarder(out) + cb := func(data []byte, metadata inputsource.NetworkMetadata) { + ev := newEvent() + Parse(data, ev) + if !ev.IsValid() { + log.Errorw("can't not parse event as syslog rfc3164", "message", string(data)) + } + event := createEvent(ev, metadata, time.Local, log) + d := &util.Data{Event: *event} + forwarder.Send(d) + } + + server, err := factory(cb, config.Protocol) + if err != nil { + return nil, err + } + + return &Input{ + outlet: out, + started: false, + server: server, + config: &config, + log: log, + }, nil +} + +// Run starts listening for Syslog events over the network. +func (p *Input) Run() { + p.Lock() + defer p.Unlock() + + if !p.started { + p.log.Infow("Starting Syslog input", "protocol", p.config.Protocol.Name()) + err := p.server.Start() + if err != nil { + p.log.Error("Error starting the server", "error", err) + } + p.started = true + } +} + +// Stop stops the syslog input. +func (p *Input) Stop() { + defer p.outlet.Close() + p.Lock() + defer p.Unlock() + + p.log.Info("Stopping Syslog input") + p.server.Stop() + p.started = false +} + +// Wait stops the syslog input. +func (p *Input) Wait() { + p.Stop() +} + +func createEvent(ev *event, metadata inputsource.NetworkMetadata, timezone *time.Location, log *logp.Logger) *beat.Event { + f := common.MapStr{ + "message": strings.TrimRight(ev.Message(), "\n"), + "source": metadata.RemoteAddr.String(), + } + + syslog := common.MapStr{} + event := common.MapStr{} + process := common.MapStr{} + + if ev.Hostname() != "" { + f["hostname"] = ev.Hostname() + } + + if ev.HasPid() { + process["pid"] = ev.Pid() + } + + if ev.Program() != "" { + process["program"] = ev.Program() + } + + if ev.HasPriority() { + syslog["priority"] = ev.Priority() + + event["severity"] = ev.Severity() + v, err := mapValueToName(ev.Severity(), severityLabels) + if err != nil { + log.Debugw("could not find severity label", "error", err) + } else { + syslog["severity_label"] = v + } + + syslog["facility"] = ev.Facility() + v, err = mapValueToName(ev.Facility(), facilityLabels) + if err != nil { + log.Debugw("could not find facility label", "error", err) + } else { + syslog["facility_label"] = v + } + } + + f["syslog"] = syslog + f["event"] = event + f["process"] = process + + return &beat.Event{ + Timestamp: ev.Timestamp(timezone), + Meta: common.MapStr{ + "truncated": metadata.Truncated, + }, + Fields: f, + } +} + +func mapValueToName(v int, m mapper) (string, error) { + if v < 0 || v >= len(m) { + return "", errors.Errorf("value out of bound: %d", v) + } + return m[v], nil +} diff --git a/filebeat/input/syslog/input_test.go b/filebeat/input/syslog/input_test.go new file mode 100644 index 000000000000..be58cdab5710 --- /dev/null +++ b/filebeat/input/syslog/input_test.go @@ -0,0 +1,157 @@ +package syslog + +import ( + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/inputsource" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +func TestWhenPriorityIsSet(t *testing.T) { + e := newEvent() + e.SetPriority([]byte("13")) + e.SetMessage([]byte("hello world")) + e.SetHostname([]byte("wopr")) + e.SetPid([]byte("123")) + + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + + expected := common.MapStr{ + "source": "127.0.0.1", + "message": "hello world", + "hostname": "wopr", + "process": common.MapStr{ + "pid": 123, + }, + "event": common.MapStr{ + "severity": 5, + }, + "syslog": common.MapStr{ + "facility": 1, + "severity_label": "Notice", + "facility_label": "user-level", + "priority": 13, + }, + } + + assert.Equal(t, expected, event.Fields) +} + +func TestWhenPriorityIsNotSet(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + e.SetHostname([]byte("wopr")) + e.SetPid([]byte("123")) + + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + expected := common.MapStr{ + "source": "127.0.0.1", + "message": "hello world", + "hostname": "wopr", + "process": common.MapStr{ + "pid": 123, + }, + "event": common.MapStr{}, + "syslog": common.MapStr{}, + } + + assert.Equal(t, expected, event.Fields) +} + +func TestPid(t *testing.T) { + t.Run("is set", func(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + e.SetPid([]byte("123")) + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + v, err := event.GetValue("process") + if !assert.NoError(t, err) { + return + } + assert.Equal(t, common.MapStr{"pid": 123}, v) + }) + + t.Run("is not set", func(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + + v, err := event.GetValue("process") + if !assert.NoError(t, err) { + return + } + assert.Equal(t, common.MapStr{}, v) + }) +} + +func TestHostname(t *testing.T) { + t.Run("is set", func(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + e.SetHostname([]byte("wopr")) + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + v, err := event.GetValue("hostname") + if !assert.NoError(t, err) { + return + } + assert.Equal(t, "wopr", v) + }) + + t.Run("is not set", func(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + + _, err := event.GetValue("hostname") + if !assert.Error(t, err) { + return + } + }) +} + +func TestProgram(t *testing.T) { + t.Run("is set", func(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + e.SetProgram([]byte("sudo")) + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + v, err := event.GetValue("process") + if !assert.NoError(t, err) { + return + } + assert.Equal(t, common.MapStr{"program": "sudo"}, v) + }) + + t.Run("is not set", func(t *testing.T) { + e := newEvent() + e.SetMessage([]byte("hello world")) + m := dummyMetadata() + event := createEvent(e, m, time.Local, logp.NewLogger("syslog")) + + v, err := event.GetValue("process") + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, common.MapStr{}, v) + }) +} + +func dummyMetadata() inputsource.NetworkMetadata { + ip := "127.0.0.1" + parsedIP := net.ParseIP(ip) + addr := &net.IPAddr{IP: parsedIP, Zone: ""} + return inputsource.NetworkMetadata{RemoteAddr: addr} +} diff --git a/filebeat/input/syslog/parser.go b/filebeat/input/syslog/parser.go new file mode 100644 index 000000000000..7e6e29fbe387 --- /dev/null +++ b/filebeat/input/syslog/parser.go @@ -0,0 +1,1538 @@ +//line parser.rl:1 + +// Code generated by ragel DO NOT EDIT. +package syslog + +//line parser.go:8 +const syslog_start int = 0 +const syslog_first_final int = 1 +const syslog_error int = -1 + +const syslog_en_main int = 0 + +//line parser.rl:9 + +// syslog +//<34>Oct 11 22:14:15 wopr su: 'su root' failed for foobar +//<13>Feb 5 17:32:18 10.0.0.99 Use the quad dmg. +func Parse(data []byte, event *event) { + var p, cs int + pe := len(data) + tok := 0 + eof := len(data) + +//line parser.go:28 + { + cs = syslog_start + } + +//line parser.go:33 + { + if (p) == (pe) { + goto _test_eof + } + switch cs { + case 0: + goto st_case_0 + case 1: + goto st_case_1 + case 2: + goto st_case_2 + case 3: + goto st_case_3 + case 4: + goto st_case_4 + case 5: + goto st_case_5 + case 6: + goto st_case_6 + case 7: + goto st_case_7 + case 8: + goto st_case_8 + case 9: + goto st_case_9 + case 10: + goto st_case_10 + case 11: + goto st_case_11 + case 12: + goto st_case_12 + case 13: + goto st_case_13 + case 14: + goto st_case_14 + case 15: + goto st_case_15 + case 16: + goto st_case_16 + case 17: + goto st_case_17 + case 18: + goto st_case_18 + case 19: + goto st_case_19 + case 20: + goto st_case_20 + case 21: + goto st_case_21 + case 22: + goto st_case_22 + case 23: + goto st_case_23 + case 24: + goto st_case_24 + case 25: + goto st_case_25 + case 26: + goto st_case_26 + case 27: + goto st_case_27 + case 28: + goto st_case_28 + case 29: + goto st_case_29 + case 30: + goto st_case_30 + case 31: + goto st_case_31 + case 32: + goto st_case_32 + case 33: + goto st_case_33 + case 34: + goto st_case_34 + case 35: + goto st_case_35 + case 36: + goto st_case_36 + case 37: + goto st_case_37 + case 38: + goto st_case_38 + case 39: + goto st_case_39 + case 40: + goto st_case_40 + case 41: + goto st_case_41 + case 42: + goto st_case_42 + case 43: + goto st_case_43 + case 44: + goto st_case_44 + case 45: + goto st_case_45 + case 46: + goto st_case_46 + case 47: + goto st_case_47 + case 48: + goto st_case_48 + case 49: + goto st_case_49 + case 50: + goto st_case_50 + case 51: + goto st_case_51 + case 52: + goto st_case_52 + case 53: + goto st_case_53 + case 54: + goto st_case_54 + case 55: + goto st_case_55 + case 56: + goto st_case_56 + case 57: + goto st_case_57 + case 58: + goto st_case_58 + case 59: + goto st_case_59 + case 60: + goto st_case_60 + case 61: + goto st_case_61 + case 62: + goto st_case_62 + case 63: + goto st_case_63 + case 64: + goto st_case_64 + case 65: + goto st_case_65 + case 66: + goto st_case_66 + case 67: + goto st_case_67 + case 68: + goto st_case_68 + case 69: + goto st_case_69 + case 70: + goto st_case_70 + case 71: + goto st_case_71 + case 72: + goto st_case_72 + case 73: + goto st_case_73 + case 74: + goto st_case_74 + case 75: + goto st_case_75 + case 76: + goto st_case_76 + } + goto st_out + st_case_0: + switch data[(p)] { + case 60: + goto tr1 + case 65: + goto tr2 + case 70: + goto tr3 + case 74: + goto tr4 + case 77: + goto tr5 + case 78: + goto tr6 + case 79: + goto tr7 + case 83: + goto tr8 + case 101: + goto tr9 + } + goto tr0 + tr0: +//line parser.rl:20 + tok = p + + goto st1 + st1: + if (p)++; (p) == (pe) { + goto _test_eof1 + } + st_case_1: +//line parser.go:228 + goto st1 + tr1: +//line parser.rl:20 + tok = p + + goto st2 + st2: + if (p)++; (p) == (pe) { + goto _test_eof2 + } + st_case_2: +//line parser.go:241 + if 48 <= data[(p)] && data[(p)] <= 57 { + goto tr11 + } + goto st1 + tr11: +//line parser.rl:20 + tok = p + + goto st3 + st3: + if (p)++; (p) == (pe) { + goto _test_eof3 + } + st_case_3: +//line parser.go:257 + if data[(p)] == 62 { + goto tr13 + } + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st4 + } + goto st1 + st4: + if (p)++; (p) == (pe) { + goto _test_eof4 + } + st_case_4: + if data[(p)] == 62 { + goto tr13 + } + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st5 + } + goto st1 + st5: + if (p)++; (p) == (pe) { + goto _test_eof5 + } + st_case_5: + if data[(p)] == 62 { + goto tr13 + } + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st6 + } + goto st1 + st6: + if (p)++; (p) == (pe) { + goto _test_eof6 + } + st_case_6: + if data[(p)] == 62 { + goto tr13 + } + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st7 + } + goto st1 + st7: + if (p)++; (p) == (pe) { + goto _test_eof7 + } + st_case_7: + if data[(p)] == 62 { + goto tr13 + } + goto st1 + tr13: +//line parser.rl:24 + event.SetPriority(data[tok:p]) + + goto st8 + st8: + if (p)++; (p) == (pe) { + goto _test_eof8 + } + st_case_8: +//line parser.go:321 + switch data[(p)] { + case 65: + goto tr2 + case 70: + goto tr3 + case 74: + goto tr4 + case 77: + goto tr5 + case 78: + goto tr6 + case 79: + goto tr7 + case 83: + goto tr8 + case 101: + goto tr9 + } + goto tr0 + tr2: +//line parser.rl:20 + tok = p + + goto st9 + st9: + if (p)++; (p) == (pe) { + goto _test_eof9 + } + st_case_9: +//line parser.go:352 + switch data[(p)] { + case 112: + goto st10 + case 117: + goto st41 + } + goto st1 + st10: + if (p)++; (p) == (pe) { + goto _test_eof10 + } + st_case_10: + if data[(p)] == 114 { + goto st11 + } + goto st1 + st11: + if (p)++; (p) == (pe) { + goto _test_eof11 + } + st_case_11: + switch data[(p)] { + case 32: + goto tr20 + case 105: + goto st39 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + tr20: +//line parser.rl:32 + event.SetMonth(data[tok:p]) + + goto st12 + st12: + if (p)++; (p) == (pe) { + goto _test_eof12 + } + st_case_12: +//line parser.go:395 + switch data[(p)] { + case 32: + goto st13 + case 51: + goto tr24 + } + switch { + case data[(p)] < 49: + if 9 <= data[(p)] && data[(p)] <= 13 { + goto st13 + } + case data[(p)] > 50: + if 52 <= data[(p)] && data[(p)] <= 57 { + goto tr25 + } + default: + goto tr23 + } + goto st1 + st13: + if (p)++; (p) == (pe) { + goto _test_eof13 + } + st_case_13: + if 49 <= data[(p)] && data[(p)] <= 57 { + goto tr25 + } + goto st1 + tr25: +//line parser.rl:20 + tok = p + + goto st14 + st14: + if (p)++; (p) == (pe) { + goto _test_eof14 + } + st_case_14: +//line parser.go:435 + if data[(p)] == 32 { + goto tr26 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr26 + } + goto st1 + tr26: +//line parser.rl:36 + event.SetDay(data[tok:p]) + + goto st15 + st15: + if (p)++; (p) == (pe) { + goto _test_eof15 + } + st_case_15: +//line parser.go:454 + if data[(p)] == 50 { + goto tr28 + } + if 48 <= data[(p)] && data[(p)] <= 49 { + goto tr27 + } + goto st1 + tr27: +//line parser.rl:20 + tok = p + + goto st16 + st16: + if (p)++; (p) == (pe) { + goto _test_eof16 + } + st_case_16: +//line parser.go:473 + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st17 + } + goto st1 + st17: + if (p)++; (p) == (pe) { + goto _test_eof17 + } + st_case_17: + if data[(p)] == 58 { + goto tr30 + } + goto st1 + tr30: +//line parser.rl:40 + event.SetHour(data[tok:p]) + + goto st18 + st18: + if (p)++; (p) == (pe) { + goto _test_eof18 + } + st_case_18: +//line parser.go:498 + if 48 <= data[(p)] && data[(p)] <= 53 { + goto tr31 + } + goto st1 + tr31: +//line parser.rl:20 + tok = p + + goto st19 + st19: + if (p)++; (p) == (pe) { + goto _test_eof19 + } + st_case_19: +//line parser.go:514 + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st20 + } + goto st1 + st20: + if (p)++; (p) == (pe) { + goto _test_eof20 + } + st_case_20: + if data[(p)] == 58 { + goto tr33 + } + goto st1 + tr33: +//line parser.rl:44 + event.SetMinute(data[tok:p]) + + goto st21 + st21: + if (p)++; (p) == (pe) { + goto _test_eof21 + } + st_case_21: +//line parser.go:539 + if 48 <= data[(p)] && data[(p)] <= 53 { + goto tr34 + } + goto st1 + tr34: +//line parser.rl:20 + tok = p + + goto st22 + st22: + if (p)++; (p) == (pe) { + goto _test_eof22 + } + st_case_22: +//line parser.go:555 + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st23 + } + goto st1 + st23: + if (p)++; (p) == (pe) { + goto _test_eof23 + } + st_case_23: + switch data[(p)] { + case 32: + goto tr36 + case 46: + goto tr37 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr36 + } + goto st1 + tr36: +//line parser.rl:48 + event.SetSecond(data[tok:p]) + + goto st24 + st24: + if (p)++; (p) == (pe) { + goto _test_eof24 + } + st_case_24: +//line parser.go:586 + switch { + case data[(p)] > 95: + if 97 <= data[(p)] && data[(p)] <= 122 { + goto tr38 + } + case data[(p)] >= 46: + goto tr38 + } + goto tr0 + tr38: +//line parser.rl:20 + tok = p + + goto st25 + st25: + if (p)++; (p) == (pe) { + goto _test_eof25 + } + st_case_25: +//line parser.go:607 + if data[(p)] == 32 { + goto tr39 + } + switch { + case data[(p)] < 46: + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr39 + } + case data[(p)] > 95: + if 97 <= data[(p)] && data[(p)] <= 122 { + goto st25 + } + default: + goto st25 + } + goto st1 + tr39: +//line parser.rl:56 + event.SetHostname(data[tok:p]) + + goto st26 + st26: + if (p)++; (p) == (pe) { + goto _test_eof26 + } + st_case_26: +//line parser.go:635 + switch data[(p)] { + case 32: + goto tr0 + case 91: + goto tr0 + case 93: + goto tr0 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr0 + } + goto tr41 + tr41: +//line parser.rl:20 + tok = p + + goto st27 + st27: + if (p)++; (p) == (pe) { + goto _test_eof27 + } + st_case_27: +//line parser.go:659 + switch data[(p)] { + case 32: + goto st1 + case 58: + goto tr43 + case 91: + goto tr44 + case 93: + goto st1 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto st1 + } + goto st27 + tr43: +//line parser.rl:60 + event.SetProgram(data[tok:p]) + + goto st28 + st28: + if (p)++; (p) == (pe) { + goto _test_eof28 + } + st_case_28: +//line parser.go:685 + switch data[(p)] { + case 32: + goto st29 + case 58: + goto tr43 + case 91: + goto tr44 + case 93: + goto st1 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto st29 + } + goto st27 + st29: + if (p)++; (p) == (pe) { + goto _test_eof29 + } + st_case_29: + goto tr0 + tr44: +//line parser.rl:60 + event.SetProgram(data[tok:p]) + + goto st30 + st30: + if (p)++; (p) == (pe) { + goto _test_eof30 + } + st_case_30: +//line parser.go:717 + if 48 <= data[(p)] && data[(p)] <= 57 { + goto tr46 + } + goto st1 + tr46: +//line parser.rl:20 + tok = p + + goto st31 + st31: + if (p)++; (p) == (pe) { + goto _test_eof31 + } + st_case_31: +//line parser.go:733 + if data[(p)] == 93 { + goto tr48 + } + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st31 + } + goto st1 + tr48: +//line parser.rl:64 + event.SetPid(data[tok:p]) + + goto st32 + st32: + if (p)++; (p) == (pe) { + goto _test_eof32 + } + st_case_32: +//line parser.go:752 + if data[(p)] == 58 { + goto st33 + } + goto st1 + st33: + if (p)++; (p) == (pe) { + goto _test_eof33 + } + st_case_33: + if data[(p)] == 32 { + goto st29 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto st29 + } + goto st1 + tr37: +//line parser.rl:48 + event.SetSecond(data[tok:p]) + + goto st34 + st34: + if (p)++; (p) == (pe) { + goto _test_eof34 + } + st_case_34: +//line parser.go:780 + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st35 + } + goto st1 + st35: + if (p)++; (p) == (pe) { + goto _test_eof35 + } + st_case_35: + if data[(p)] == 32 { + goto st24 + } + switch { + case data[(p)] > 13: + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st35 + } + case data[(p)] >= 9: + goto st24 + } + goto st1 + tr28: +//line parser.rl:20 + tok = p + + goto st36 + st36: + if (p)++; (p) == (pe) { + goto _test_eof36 + } + st_case_36: +//line parser.go:813 + if 48 <= data[(p)] && data[(p)] <= 51 { + goto st17 + } + goto st1 + tr23: +//line parser.rl:20 + tok = p + + goto st37 + st37: + if (p)++; (p) == (pe) { + goto _test_eof37 + } + st_case_37: +//line parser.go:829 + if data[(p)] == 32 { + goto tr26 + } + switch { + case data[(p)] > 13: + if 48 <= data[(p)] && data[(p)] <= 57 { + goto st14 + } + case data[(p)] >= 9: + goto tr26 + } + goto st1 + tr24: +//line parser.rl:20 + tok = p + + goto st38 + st38: + if (p)++; (p) == (pe) { + goto _test_eof38 + } + st_case_38: +//line parser.go:853 + if data[(p)] == 32 { + goto tr26 + } + switch { + case data[(p)] > 13: + if 48 <= data[(p)] && data[(p)] <= 49 { + goto st14 + } + case data[(p)] >= 9: + goto tr26 + } + goto st1 + st39: + if (p)++; (p) == (pe) { + goto _test_eof39 + } + st_case_39: + if data[(p)] == 108 { + goto st40 + } + goto st1 + st40: + if (p)++; (p) == (pe) { + goto _test_eof40 + } + st_case_40: + if data[(p)] == 32 { + goto tr20 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st41: + if (p)++; (p) == (pe) { + goto _test_eof41 + } + st_case_41: + if data[(p)] == 103 { + goto st42 + } + goto st1 + st42: + if (p)++; (p) == (pe) { + goto _test_eof42 + } + st_case_42: + switch data[(p)] { + case 32: + goto tr20 + case 117: + goto st43 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st43: + if (p)++; (p) == (pe) { + goto _test_eof43 + } + st_case_43: + if data[(p)] == 115 { + goto st44 + } + goto st1 + st44: + if (p)++; (p) == (pe) { + goto _test_eof44 + } + st_case_44: + if data[(p)] == 116 { + goto st40 + } + goto st1 + tr3: +//line parser.rl:20 + tok = p + + goto st45 + st45: + if (p)++; (p) == (pe) { + goto _test_eof45 + } + st_case_45: +//line parser.go:940 + if data[(p)] == 101 { + goto st46 + } + goto st1 + st46: + if (p)++; (p) == (pe) { + goto _test_eof46 + } + st_case_46: + if data[(p)] == 98 { + goto st47 + } + goto st1 + st47: + if (p)++; (p) == (pe) { + goto _test_eof47 + } + st_case_47: + switch data[(p)] { + case 32: + goto tr20 + case 114: + goto st48 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st48: + if (p)++; (p) == (pe) { + goto _test_eof48 + } + st_case_48: + if data[(p)] == 117 { + goto st49 + } + goto st1 + st49: + if (p)++; (p) == (pe) { + goto _test_eof49 + } + st_case_49: + if data[(p)] == 97 { + goto st50 + } + goto st1 + st50: + if (p)++; (p) == (pe) { + goto _test_eof50 + } + st_case_50: + if data[(p)] == 114 { + goto st51 + } + goto st1 + st51: + if (p)++; (p) == (pe) { + goto _test_eof51 + } + st_case_51: + if data[(p)] == 121 { + goto st40 + } + goto st1 + tr4: +//line parser.rl:20 + tok = p + + goto st52 + st52: + if (p)++; (p) == (pe) { + goto _test_eof52 + } + st_case_52: +//line parser.go:1016 + switch data[(p)] { + case 97: + goto st53 + case 117: + goto st55 + } + goto st1 + st53: + if (p)++; (p) == (pe) { + goto _test_eof53 + } + st_case_53: + if data[(p)] == 110 { + goto st54 + } + goto st1 + st54: + if (p)++; (p) == (pe) { + goto _test_eof54 + } + st_case_54: + switch data[(p)] { + case 32: + goto tr20 + case 117: + goto st49 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st55: + if (p)++; (p) == (pe) { + goto _test_eof55 + } + st_case_55: + switch data[(p)] { + case 108: + goto st56 + case 110: + goto st57 + } + goto st1 + st56: + if (p)++; (p) == (pe) { + goto _test_eof56 + } + st_case_56: + switch data[(p)] { + case 32: + goto tr20 + case 121: + goto st40 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st57: + if (p)++; (p) == (pe) { + goto _test_eof57 + } + st_case_57: + switch data[(p)] { + case 32: + goto tr20 + case 101: + goto st40 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + tr5: +//line parser.rl:20 + tok = p + + goto st58 + st58: + if (p)++; (p) == (pe) { + goto _test_eof58 + } + st_case_58: +//line parser.go:1101 + if data[(p)] == 97 { + goto st59 + } + goto st1 + st59: + if (p)++; (p) == (pe) { + goto _test_eof59 + } + st_case_59: + switch data[(p)] { + case 32: + goto tr20 + case 114: + goto st60 + case 121: + goto st40 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st60: + if (p)++; (p) == (pe) { + goto _test_eof60 + } + st_case_60: + switch data[(p)] { + case 32: + goto tr20 + case 99: + goto st61 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st61: + if (p)++; (p) == (pe) { + goto _test_eof61 + } + st_case_61: + if data[(p)] == 104 { + goto st40 + } + goto st1 + tr6: +//line parser.rl:20 + tok = p + + goto st62 + st62: + if (p)++; (p) == (pe) { + goto _test_eof62 + } + st_case_62: +//line parser.go:1158 + if data[(p)] == 111 { + goto st63 + } + goto st1 + st63: + if (p)++; (p) == (pe) { + goto _test_eof63 + } + st_case_63: + if data[(p)] == 118 { + goto st64 + } + goto st1 + st64: + if (p)++; (p) == (pe) { + goto _test_eof64 + } + st_case_64: + switch data[(p)] { + case 32: + goto tr20 + case 101: + goto st65 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st65: + if (p)++; (p) == (pe) { + goto _test_eof65 + } + st_case_65: + if data[(p)] == 109 { + goto st66 + } + goto st1 + st66: + if (p)++; (p) == (pe) { + goto _test_eof66 + } + st_case_66: + if data[(p)] == 98 { + goto st67 + } + goto st1 + st67: + if (p)++; (p) == (pe) { + goto _test_eof67 + } + st_case_67: + if data[(p)] == 101 { + goto st68 + } + goto st1 + st68: + if (p)++; (p) == (pe) { + goto _test_eof68 + } + st_case_68: + if data[(p)] == 114 { + goto st40 + } + goto st1 + tr7: +//line parser.rl:20 + tok = p + + goto st69 + st69: + if (p)++; (p) == (pe) { + goto _test_eof69 + } + st_case_69: +//line parser.go:1234 + if data[(p)] == 99 { + goto st70 + } + goto st1 + st70: + if (p)++; (p) == (pe) { + goto _test_eof70 + } + st_case_70: + if data[(p)] == 116 { + goto st71 + } + goto st1 + st71: + if (p)++; (p) == (pe) { + goto _test_eof71 + } + st_case_71: + switch data[(p)] { + case 32: + goto tr20 + case 111: + goto st66 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + tr8: +//line parser.rl:20 + tok = p + + goto st72 + st72: + if (p)++; (p) == (pe) { + goto _test_eof72 + } + st_case_72: +//line parser.go:1274 + if data[(p)] == 101 { + goto st73 + } + goto st1 + st73: + if (p)++; (p) == (pe) { + goto _test_eof73 + } + st_case_73: + if data[(p)] == 112 { + goto st74 + } + goto st1 + st74: + if (p)++; (p) == (pe) { + goto _test_eof74 + } + st_case_74: + switch data[(p)] { + case 32: + goto tr20 + case 116: + goto st75 + } + if 9 <= data[(p)] && data[(p)] <= 13 { + goto tr20 + } + goto st1 + st75: + if (p)++; (p) == (pe) { + goto _test_eof75 + } + st_case_75: + if data[(p)] == 101 { + goto st65 + } + goto st1 + tr9: +//line parser.rl:20 + tok = p + + goto st76 + st76: + if (p)++; (p) == (pe) { + goto _test_eof76 + } + st_case_76: +//line parser.go:1323 + if data[(p)] == 99 { + goto st64 + } + goto st1 + st_out: + _test_eof1: + cs = 1 + goto _test_eof + _test_eof2: + cs = 2 + goto _test_eof + _test_eof3: + cs = 3 + goto _test_eof + _test_eof4: + cs = 4 + goto _test_eof + _test_eof5: + cs = 5 + goto _test_eof + _test_eof6: + cs = 6 + goto _test_eof + _test_eof7: + cs = 7 + goto _test_eof + _test_eof8: + cs = 8 + goto _test_eof + _test_eof9: + cs = 9 + goto _test_eof + _test_eof10: + cs = 10 + goto _test_eof + _test_eof11: + cs = 11 + goto _test_eof + _test_eof12: + cs = 12 + goto _test_eof + _test_eof13: + cs = 13 + goto _test_eof + _test_eof14: + cs = 14 + goto _test_eof + _test_eof15: + cs = 15 + goto _test_eof + _test_eof16: + cs = 16 + goto _test_eof + _test_eof17: + cs = 17 + goto _test_eof + _test_eof18: + cs = 18 + goto _test_eof + _test_eof19: + cs = 19 + goto _test_eof + _test_eof20: + cs = 20 + goto _test_eof + _test_eof21: + cs = 21 + goto _test_eof + _test_eof22: + cs = 22 + goto _test_eof + _test_eof23: + cs = 23 + goto _test_eof + _test_eof24: + cs = 24 + goto _test_eof + _test_eof25: + cs = 25 + goto _test_eof + _test_eof26: + cs = 26 + goto _test_eof + _test_eof27: + cs = 27 + goto _test_eof + _test_eof28: + cs = 28 + goto _test_eof + _test_eof29: + cs = 29 + goto _test_eof + _test_eof30: + cs = 30 + goto _test_eof + _test_eof31: + cs = 31 + goto _test_eof + _test_eof32: + cs = 32 + goto _test_eof + _test_eof33: + cs = 33 + goto _test_eof + _test_eof34: + cs = 34 + goto _test_eof + _test_eof35: + cs = 35 + goto _test_eof + _test_eof36: + cs = 36 + goto _test_eof + _test_eof37: + cs = 37 + goto _test_eof + _test_eof38: + cs = 38 + goto _test_eof + _test_eof39: + cs = 39 + goto _test_eof + _test_eof40: + cs = 40 + goto _test_eof + _test_eof41: + cs = 41 + goto _test_eof + _test_eof42: + cs = 42 + goto _test_eof + _test_eof43: + cs = 43 + goto _test_eof + _test_eof44: + cs = 44 + goto _test_eof + _test_eof45: + cs = 45 + goto _test_eof + _test_eof46: + cs = 46 + goto _test_eof + _test_eof47: + cs = 47 + goto _test_eof + _test_eof48: + cs = 48 + goto _test_eof + _test_eof49: + cs = 49 + goto _test_eof + _test_eof50: + cs = 50 + goto _test_eof + _test_eof51: + cs = 51 + goto _test_eof + _test_eof52: + cs = 52 + goto _test_eof + _test_eof53: + cs = 53 + goto _test_eof + _test_eof54: + cs = 54 + goto _test_eof + _test_eof55: + cs = 55 + goto _test_eof + _test_eof56: + cs = 56 + goto _test_eof + _test_eof57: + cs = 57 + goto _test_eof + _test_eof58: + cs = 58 + goto _test_eof + _test_eof59: + cs = 59 + goto _test_eof + _test_eof60: + cs = 60 + goto _test_eof + _test_eof61: + cs = 61 + goto _test_eof + _test_eof62: + cs = 62 + goto _test_eof + _test_eof63: + cs = 63 + goto _test_eof + _test_eof64: + cs = 64 + goto _test_eof + _test_eof65: + cs = 65 + goto _test_eof + _test_eof66: + cs = 66 + goto _test_eof + _test_eof67: + cs = 67 + goto _test_eof + _test_eof68: + cs = 68 + goto _test_eof + _test_eof69: + cs = 69 + goto _test_eof + _test_eof70: + cs = 70 + goto _test_eof + _test_eof71: + cs = 71 + goto _test_eof + _test_eof72: + cs = 72 + goto _test_eof + _test_eof73: + cs = 73 + goto _test_eof + _test_eof74: + cs = 74 + goto _test_eof + _test_eof75: + cs = 75 + goto _test_eof + _test_eof76: + cs = 76 + goto _test_eof + + _test_eof: + { + } + if (p) == eof { + switch cs { + case 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76: +//line parser.rl:28 + event.SetMessage(data[tok:p]) + +//line parser.go:1414 + } + } + + } + +//line parser.rl:72 +} diff --git a/filebeat/input/syslog/parser.rl b/filebeat/input/syslog/parser.rl new file mode 100644 index 000000000000..bbf4629258e6 --- /dev/null +++ b/filebeat/input/syslog/parser.rl @@ -0,0 +1,73 @@ +// Code generated by ragel DO NOT EDIT. +package syslog + +%%{ + machine syslog; + write data; + variable p p; + variable pe pe; +}%% + +// syslog +//<34>Oct 11 22:14:15 wopr su: 'su root' failed for foobar +//<13>Feb 5 17:32:18 10.0.0.99 Use the quad dmg. +func Parse(data []byte, event *event) { + var p, cs int + pe := len(data) + tok := 0 + eof := len(data) + %%{ + action tok { + tok = p + } + + action priority { + event.SetPriority(data[tok:p]) + } + + action message { + event.SetMessage(data[tok:p]) + } + + action month { + event.SetMonth(data[tok:p]) + } + + action day { + event.SetDay(data[tok:p]) + } + + action hour { + event.SetHour(data[tok:p]) + } + + action minute { + event.SetMinute(data[tok:p]) + } + + action second { + event.SetSecond(data[tok:p]) + } + + action nanosecond{ + event.SetNanosecond(data[tok:p]) + } + + action hostname { + event.SetHostname(data[tok:p]) + } + + action program { + event.SetProgram(data[tok:p]) + } + + action pid { + event.SetPid(data[tok:p]) + } + + include syslog_rfc3164 "syslog_rfc3164.rl"; + + write init; + write exec; + }%% +} diff --git a/filebeat/input/syslog/parser_test.go b/filebeat/input/syslog/parser_test.go new file mode 100644 index 000000000000..3cfd581d148c --- /dev/null +++ b/filebeat/input/syslog/parser_test.go @@ -0,0 +1,316 @@ +package syslog + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseSyslog(t *testing.T) { + tests := []struct { + title string + log []byte + syslog event + }{ + { + title: "message only", + log: []byte("--- last message repeated 1 time ---"), + syslog: event{ + priority: -1, + message: "--- last message repeated 1 time ---", + hostname: "", + program: "", + pid: -1, + month: -1, + day: -1, + hour: -1, + minute: -1, + second: -1, + }, + }, + { + title: "time and message only", + log: []byte("Oct 11 22:14:15 --- last message repeated 1 time ---"), + syslog: event{ + priority: -1, + message: "--- last message repeated 1 time ---", + hostname: "", + program: "", + pid: -1, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + }, + }, + { + title: "No priority defined", + log: []byte("Oct 11 22:14:15 mymachine su[230]: 'su root' failed for lonvick on /dev/pts/8"), + syslog: event{ + priority: -1, + message: "'su root' failed for lonvick on /dev/pts/8", + hostname: "mymachine", + program: "su", + pid: 230, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + }, + }, + { + log: []byte("<34>Oct 11 22:14:15 mymachine su[230]: 'su root' failed for lonvick on /dev/pts/8"), + syslog: event{ + priority: 34, + message: "'su root' failed for lonvick on /dev/pts/8", + hostname: "mymachine", + program: "su", + pid: 230, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + }, + }, + { + log: []byte("<34>Oct 11 22:14:15.57643 mymachine su: 'su root' failed for lonvick on /dev/pts/8"), + syslog: event{ + priority: 34, + message: "'su root' failed for lonvick on /dev/pts/8", + hostname: "mymachine", + program: "su", + pid: -1, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + nanosecond: 57643, + }, + }, + { + log: []byte("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8"), + syslog: event{ + priority: 34, + message: "'su root' failed for lonvick on /dev/pts/8", + hostname: "mymachine", + program: "su", + pid: -1, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + }, + }, + { + log: []byte("<34>Oct 11 22:14:15 mymachine postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8"), + syslog: event{ + priority: 34, + message: "'su root' failed for lonvick on /dev/pts/8", + hostname: "mymachine", + program: "postfix/smtpd", + pid: 2000, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + }, + }, + { + log: []byte("<34>Oct 11 22:14:15 wopr.mymachine.co postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8"), + syslog: event{ + priority: 34, + message: "'su root' failed for lonvick on /dev/pts/8", + hostname: "wopr.mymachine.co", + program: "postfix/smtpd", + pid: 2000, + month: 10, + day: 11, + hour: 22, + minute: 14, + second: 15, + }, + }, + { + log: []byte("<13>Feb 25 17:32:18 10.0.0.99 Use the Force!"), + syslog: event{ + message: "Use the Force!", + hostname: "10.0.0.99", + priority: 13, + pid: -1, + month: 2, + day: 25, + hour: 17, + minute: 32, + second: 18, + }, + }, + { + title: "Check relay + hostname alpha", + log: []byte("<13>Feb 25 17:32:18 wopr Use the Force!"), + syslog: event{ + message: "Use the Force!", + hostname: "wopr", + priority: 13, + pid: -1, + month: 2, + day: 25, + hour: 17, + minute: 32, + second: 18, + }, + }, + { + title: "Check relay + ipv6", + log: []byte("<13>Feb 25 17:32:18 2607:f0d0:1002:51::4 Use the Force!"), + syslog: event{ + message: "Use the Force!", + hostname: "2607:f0d0:1002:51::4", + priority: 13, + pid: -1, + month: 2, + day: 25, + hour: 17, + minute: 32, + second: 18, + }, + }, + { + title: "Check relay + ipv6", + log: []byte("<13>Feb 25 17:32:18 2607:f0d0:1002:0051:0000:0000:0000:0004 Use the Force!"), + syslog: event{ + message: "Use the Force!", + hostname: "2607:f0d0:1002:0051:0000:0000:0000:0004", + priority: 13, + pid: -1, + month: 2, + day: 25, + hour: 17, + minute: 32, + second: 18, + }, + }, + { + title: "Number inf the host", + log: []byte("<164>Oct 26 15:19:25 1.2.3.4 ASA1-2: Deny udp src DRAC:10.1.2.3/43434 dst outside:192.168.0.1/53 by access-group \"acl_drac\" [0x0, 0x0]"), + syslog: event{ + message: "Deny udp src DRAC:10.1.2.3/43434 dst outside:192.168.0.1/53 by access-group \"acl_drac\" [0x0, 0x0]", + hostname: "1.2.3.4", + program: "ASA1-2", + priority: 164, + pid: -1, + month: 10, + day: 26, + hour: 15, + minute: 19, + second: 25, + }, + }, + { + log: []byte("<164>Oct 26 15:19:25 1.2.3.4 %ASA1-120: Deny udp src DRAC:10.1.2.3/43434 dst outside:192.168.0.1/53 by access-group \"acl_drac\" [0x0, 0x0]"), + syslog: event{ + message: "Deny udp src DRAC:10.1.2.3/43434 dst outside:192.168.0.1/53 by access-group \"acl_drac\" [0x0, 0x0]", + hostname: "1.2.3.4", + program: "%ASA1-120", + priority: 164, + pid: -1, + month: 10, + day: 26, + hour: 15, + minute: 19, + second: 25, + }, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("%s:%s", test.title, string(test.log)), func(t *testing.T) { + l := newEvent() + Parse(test.log, l) + assert.Equal(t, test.syslog.Message(), l.Message()) + assert.Equal(t, test.syslog.Hostname(), l.Hostname()) + assert.Equal(t, test.syslog.Priority(), l.Priority()) + assert.Equal(t, test.syslog.Pid(), l.Pid()) + assert.Equal(t, test.syslog.Program(), l.Program()) + assert.Equal(t, test.syslog.Month(), l.Month()) + assert.Equal(t, test.syslog.Day(), l.Day()) + assert.Equal(t, test.syslog.Hour(), l.Hour()) + assert.Equal(t, test.syslog.Minute(), l.Minute()) + assert.Equal(t, test.syslog.Second(), l.Second()) + }) + } +} + +func TestDay(t *testing.T) { + for d := 1; d <= 31; d++ { + t.Run(fmt.Sprintf("Day %d", d), func(t *testing.T) { + log := fmt.Sprintf("<34>Oct %2d 22:14:15 mymachine postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8", d) + l := newEvent() + Parse([]byte(log), l) + assert.Equal(t, d, l.Day()) + }) + } +} + +func TestHour(t *testing.T) { + for d := 0; d <= 23; d++ { + t.Run(fmt.Sprintf("Hour %d", d), func(t *testing.T) { + log := fmt.Sprintf("<34>Oct 11 %02d:14:15 mymachine postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8", d) + l := newEvent() + Parse([]byte(log), l) + assert.Equal(t, d, l.Hour()) + }) + } +} + +func TestMinute(t *testing.T) { + for d := 0; d <= 59; d++ { + t.Run(fmt.Sprintf("Minute %d", d), func(t *testing.T) { + log := fmt.Sprintf("<34>Oct 11 10:%02d:15 mymachine postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8", d) + l := newEvent() + Parse([]byte(log), l) + assert.Equal(t, d, l.Minute()) + }) + } +} + +func TestSecond(t *testing.T) { + for d := 0; d <= 59; d++ { + t.Run(fmt.Sprintf("Second %d", d), func(t *testing.T) { + log := fmt.Sprintf("<34>Oct 11 10:15:%02d mymachine postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8", d) + l := newEvent() + Parse([]byte(log), l) + assert.Equal(t, d, l.Second()) + }) + } +} + +func TestPriority(t *testing.T) { + for d := 1; d <= 120; d++ { + t.Run(fmt.Sprintf("Priority %d", d), func(t *testing.T) { + log := fmt.Sprintf("<%d>Oct 11 10:15:15 mymachine postfix/smtpd[2000]: 'su root' failed for lonvick on /dev/pts/8", d) + l := newEvent() + Parse([]byte(log), l) + assert.Equal(t, d, l.Priority()) + }) + return + } +} + +var e *event + +func BenchmarkParser(b *testing.B) { + b.ReportAllocs() + l := newEvent() + log := []byte("<34>Oct 11 22:14:15 mymachine su: 'su root' failed for lonvick on /dev/pts/8") + for n := 0; n < b.N; n++ { + Parse(log, l) + e = l + } +} diff --git a/filebeat/input/syslog/syslog_rfc3164.rl b/filebeat/input/syslog/syslog_rfc3164.rl new file mode 100644 index 000000000000..ea0f632bba7c --- /dev/null +++ b/filebeat/input/syslog/syslog_rfc3164.rl @@ -0,0 +1,45 @@ +%%{ + machine syslog_rfc3164; + + # General + brackets = "[" | "]"; + + # Priority + # Ref: https://tools.ietf.org/html/rfc3164#section-4.1.1 + # Match: "<123>" + priority = digit{1,5}>tok %priority; + prio = "<" priority ">"; + + # Header + # Timestamp + # https://tools.ietf.org/html/rfc3164#section-4.1.2 + # Match: "Jan" and "January" + month = ( "Jan" ("uary")? | "Feb" "ruary"? | "Mar" "ch"? | "Apr" "il"? | "Ma" "y"? | "Jun" "e"? | "Jul" "y"? | "Aug" "ust"? | "Sep" ("tember")? | "Oct" "ober"? | "Nov" "ember"? | "ec" "ember"?) >tok %month; + + # Match: " 5" and "10" as the day + multiple_digits_day = (([12][0-9]) | ("3"[01]))>tok %day; + single_digit_day = [1-9]>tok %day; + day = (space? single_digit_day | multiple_digits_day); + + # Match: hh:mm:ss (24 hr format) + hour = ([01][0-9]|"2"[0-3])>tok %hour; + minute = ([0-5][0-9])>tok %minute; + second = ([0-5][0-9])>tok %second; + nanosecond = digit+; + time = hour ":" minute ":" second ("." nanosecond)?; + timestamp = month space day space time; + + hostname = [a-zA-Z0-9.-_:]+>tok %hostname; + header = timestamp space hostname space; + + # MSG + # https://tools.ietf.org/html/rfc3164#section-4.1.3 + program = (extend -space -brackets)+>tok %program; + pid = digit+>tok %pid; + syslogprog = program ("[" pid "]")? ":" space; + message = any+>tok %message; + msg = syslogprog? message>tok %message; + + main := (prio)? (header msg | timestamp space message | message); + +}%% diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 061b4318deeb..96f66ed4780e 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -7,6 +7,7 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/filebeat/inputsource/tcp" "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" @@ -53,12 +54,12 @@ func NewInput( return nil, err } - cb := func(data []byte, metadata tcp.Metadata) { + cb := func(data []byte, metadata inputsource.NetworkMetadata) { event := createEvent(data, metadata) forwarder.Send(event) } - server, err := tcp.New(cb, &config.Config) + server, err := tcp.New(&config.Config, cb) if err != nil { return nil, err } @@ -103,7 +104,7 @@ func (p *Input) Wait() { p.Stop() } -func createEvent(raw []byte, metadata tcp.Metadata) *util.Data { +func createEvent(raw []byte, metadata inputsource.NetworkMetadata) *util.Data { data := util.NewData() data.Event = beat.Event{ Timestamp: time.Now(), diff --git a/filebeat/input/tcp/input_test.go b/filebeat/input/tcp/input_test.go index 80a32f7a9e50..fde5100ab974 100644 --- a/filebeat/input/tcp/input_test.go +++ b/filebeat/input/tcp/input_test.go @@ -6,7 +6,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/filebeat/inputsource/tcp" + "github.com/elastic/beats/filebeat/inputsource" ) func TestCreateEvent(t *testing.T) { @@ -16,7 +16,7 @@ func TestCreateEvent(t *testing.T) { addr := &net.IPAddr{IP: parsedIP, Zone: ""} message := []byte(hello) - mt := tcp.Metadata{RemoteAddr: addr} + mt := inputsource.NetworkMetadata{RemoteAddr: addr} data := createEvent(message, mt) event := data.GetEvent() diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index bb3d62cbb121..0f6ff183c887 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -7,6 +7,7 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/filebeat/inputsource/udp" "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/beat" @@ -49,7 +50,7 @@ func NewInput( } forwarder := harvester.NewForwarder(out) - callback := func(data []byte, metadata udp.Metadata) { + callback := func(data []byte, metadata inputsource.NetworkMetadata) { e := util.NewData() e.Event = beat.Event{ Timestamp: time.Now(), diff --git a/filebeat/inputsource/network.go b/filebeat/inputsource/network.go new file mode 100644 index 000000000000..699a26f20fe4 --- /dev/null +++ b/filebeat/inputsource/network.go @@ -0,0 +1,20 @@ +package inputsource + +import ( + "net" +) + +// Network interface implemented by TCP and UDP input source. +type Network interface { + Start() error + Stop() +} + +// NetworkMetadata defines common information that we can retrieve from a remote connection. +type NetworkMetadata struct { + RemoteAddr net.Addr + Truncated bool +} + +// NetworkFunc defines callback executed when a new event is received from a network source. +type NetworkFunc = func(data []byte, metadata NetworkMetadata) diff --git a/filebeat/inputsource/network_metadata.go b/filebeat/inputsource/network_metadata.go new file mode 100644 index 000000000000..e283490e5be9 --- /dev/null +++ b/filebeat/inputsource/network_metadata.go @@ -0,0 +1 @@ +package inputsource diff --git a/filebeat/inputsource/tcp/client.go b/filebeat/inputsource/tcp/client.go index cee3a66b83f5..7d2e0aa124d5 100644 --- a/filebeat/inputsource/tcp/client.go +++ b/filebeat/inputsource/tcp/client.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/libbeat/logp" ) @@ -14,9 +15,9 @@ import ( type client struct { conn net.Conn log *logp.Logger - callback CallbackFunc + callback inputsource.NetworkFunc done chan struct{} - metadata Metadata + metadata inputsource.NetworkMetadata splitFunc bufio.SplitFunc maxMessageSize uint64 timeout time.Duration @@ -25,7 +26,7 @@ type client struct { func newClient( conn net.Conn, log *logp.Logger, - callback CallbackFunc, + callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc, maxReadMessage uint64, timeout time.Duration, @@ -38,7 +39,7 @@ func newClient( splitFunc: splitFunc, maxMessageSize: maxReadMessage, timeout: timeout, - metadata: Metadata{ + metadata: inputsource.NetworkMetadata{ RemoteAddr: conn.RemoteAddr(), }, } diff --git a/filebeat/inputsource/tcp/config.go b/filebeat/inputsource/tcp/config.go index 9dbd98441bb6..afce4d1940af 100644 --- a/filebeat/inputsource/tcp/config.go +++ b/filebeat/inputsource/tcp/config.go @@ -7,6 +7,11 @@ import ( "github.com/elastic/beats/libbeat/common/cfgtype" ) +// Name is the human readable name and identifier. +const Name = "tcp" + +type size uint64 + // Config exposes the tcp configuration. type Config struct { Host string `config:"host"` diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go index 56bafa953219..66bccc1d16f2 100644 --- a/filebeat/inputsource/tcp/server.go +++ b/filebeat/inputsource/tcp/server.go @@ -7,21 +7,14 @@ import ( "net" "sync" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/libbeat/logp" ) -// Metadata information about the remote host. -type Metadata struct { - RemoteAddr net.Addr -} - -// CallbackFunc receives new events read from the TCP socket. -type CallbackFunc = func(data []byte, metadata Metadata) - // Server represent a TCP server type Server struct { sync.RWMutex - callback CallbackFunc + callback inputsource.NetworkFunc config *Config Listener net.Listener clients map[*client]struct{} @@ -33,8 +26,8 @@ type Server struct { // New creates a new tcp server func New( - callback CallbackFunc, config *Config, + callback inputsource.NetworkFunc, ) (*Server, error) { if len(config.LineDelimiter) == 0 { diff --git a/filebeat/inputsource/tcp/server_test.go b/filebeat/inputsource/tcp/server_test.go index 7aaf200ece8c..19a52976f892 100644 --- a/filebeat/inputsource/tcp/server_test.go +++ b/filebeat/inputsource/tcp/server_test.go @@ -11,6 +11,7 @@ import ( "github.com/dustin/go-humanize" "github.com/stretchr/testify/assert" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/libbeat/common" ) @@ -22,7 +23,7 @@ var defaultConfig = Config{ type info struct { message string - mt Metadata + mt inputsource.NetworkMetadata } func TestErrorOnEmptyLineDelimiter(t *testing.T) { @@ -134,7 +135,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) { t.Run(test.name, func(t *testing.T) { ch := make(chan *info, len(test.expectedMessages)) defer close(ch) - to := func(message []byte, mt Metadata) { + to := func(message []byte, mt inputsource.NetworkMetadata) { ch <- &info{message: string(message), mt: mt} } test.cfg["host"] = "localhost:0" @@ -144,7 +145,7 @@ func TestReceiveEventsAndMetadata(t *testing.T) { if !assert.NoError(t, err) { return } - server, err := New(to, &config) + server, err := New(&config, to) if !assert.NoError(t, err) { return } @@ -182,7 +183,7 @@ func TestReceiveNewEventsConcurrently(t *testing.T) { eventsCount := 100 ch := make(chan *info, eventsCount*workers) defer close(ch) - to := func(message []byte, mt Metadata) { + to := func(message []byte, mt inputsource.NetworkMetadata) { ch <- &info{message: string(message), mt: mt} } cfg, err := common.NewConfigFrom(map[string]interface{}{"host": ":0"}) @@ -194,7 +195,7 @@ func TestReceiveNewEventsConcurrently(t *testing.T) { if !assert.NoError(t, err) { return } - server, err := New(to, &config) + server, err := New(&config, to) if !assert.NoError(t, err) { return } diff --git a/filebeat/inputsource/udp/server.go b/filebeat/inputsource/udp/server.go index 0e55e50c2b47..e5cf5e93d71f 100644 --- a/filebeat/inputsource/udp/server.go +++ b/filebeat/inputsource/udp/server.go @@ -7,24 +7,22 @@ import ( "sync" "time" + "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/libbeat/logp" ) +// Name is the human readable name and identifier. +const Name = "udp" + const windowErrBuffer = "A message sent on a datagram socket was larger than the internal message" + " buffer or some other network limit, or the buffer used to receive a datagram into was smaller" + " than the datagram itself." -// Metadata contains formations about the packet. -type Metadata struct { - RemoteAddr net.Addr - Truncated bool -} - // Server creates a simple UDP Server and listen to a specific host:port and will send any // event received to the callback method. type Server struct { config *Config - callback func(data []byte, mt Metadata) + callback inputsource.NetworkFunc Listener net.PacketConn log *logp.Logger wg sync.WaitGroup @@ -32,7 +30,7 @@ type Server struct { } // New returns a new UDPServer instance. -func New(config *Config, callback func(data []byte, mt Metadata)) *Server { +func New(config *Config, callback inputsource.NetworkFunc) *Server { return &Server{ config: config, callback: callback, @@ -86,13 +84,13 @@ func (u *Server) run() { // On Windows send the current buffer and mark it as truncated. // The buffer will have content but length will return 0, addr will be nil. if isLargerThanBuffer(err) { - u.callback(buffer, Metadata{RemoteAddr: addr, Truncated: true}) + u.callback(buffer, inputsource.NetworkMetadata{RemoteAddr: addr, Truncated: true}) continue } } if length > 0 { - u.callback(buffer[:length], Metadata{RemoteAddr: addr}) + u.callback(buffer[:length], inputsource.NetworkMetadata{RemoteAddr: addr}) } } } diff --git a/filebeat/inputsource/udp/server_test.go b/filebeat/inputsource/udp/server_test.go index 05ddd2e113d0..9c72a95661d2 100644 --- a/filebeat/inputsource/udp/server_test.go +++ b/filebeat/inputsource/udp/server_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/filebeat/inputsource" ) const maxMessageSize = 20 @@ -14,7 +16,7 @@ const timeout = time.Second * 15 type info struct { message []byte - mt Metadata + mt inputsource.NetworkMetadata } func TestReceiveEventFromUDP(t *testing.T) { @@ -38,7 +40,7 @@ func TestReceiveEventFromUDP(t *testing.T) { ch := make(chan info) host := "localhost:0" config := &Config{Host: host, MaxMessageSize: maxMessageSize, Timeout: timeout} - fn := func(message []byte, metadata Metadata) { + fn := func(message []byte, metadata inputsource.NetworkMetadata) { ch <- info{message: message, mt: metadata} } s := New(config, fn) diff --git a/filebeat/tests/system/test_syslog.py b/filebeat/tests/system/test_syslog.py new file mode 100644 index 000000000000..d1a3b371ec71 --- /dev/null +++ b/filebeat/tests/system/test_syslog.py @@ -0,0 +1,104 @@ +from filebeat import BaseTest +import socket + + +class Test(BaseTest): + """ + Test filebeat with the syslog input + """ + + def test_syslog_with_tcp(self): + """ + Test syslog input with events from TCP. + """ + host = "127.0.0.1" + port = 8080 + input_raw = """ +- type: syslog + protocol: + tcp: + host: "{}:{}" +""" + + input_raw = input_raw.format(host, port) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for TCP connection")) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP + sock.connect((host, port)) + + for n in range(0, 2): + m = "<13>Oct 11 22:14:15 wopr.mymachine.co postfix/smtpd[2000]:" \ + " 'su root' failed for lonvick on /dev/pts/8 {}\n" + m = m.format(n) + sock.send(m) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + self.assert_syslog(output[0]) + sock.close() + + def test_syslog_with_udp(self): + """ + Test syslog input with events from TCP. + """ + host = "127.0.0.1" + port = 8080 + input_raw = """ +- type: syslog + protocol: + udp: + host: "{}:{}" +""" + + input_raw = input_raw.format(host, port) + self.render_config_template( + input_raw=input_raw, + inputs=False, + ) + + filebeat = self.start_beat() + + self.wait_until(lambda: self.log_contains("Started listening for UDP connection")) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # TCP + + for n in range(0, 2): + m = "<13>Oct 11 22:14:15 wopr.mymachine.co postfix/smtpd[2000]:" \ + " 'su root' failed for lonvick on /dev/pts/8 {}\n" + m = m.format(n) + sock.sendto(m, (host, port)) + + self.wait_until(lambda: self.output_count(lambda x: x >= 2)) + + filebeat.check_kill_and_wait() + + output = self.read_output() + + assert len(output) == 2 + self.assert_syslog(output[0]) + + def assert_syslog(self, syslog): + assert syslog["prospector.type"] == "syslog" + assert syslog["event.severity"] == 5 + assert syslog["hostname"] == "wopr.mymachine.co" + assert syslog["input.type"] == "syslog" + assert syslog["message"] == "'su root' failed for lonvick on /dev/pts/8 0" + assert syslog["process.pid"] == 2000 + assert syslog["process.program"] == "postfix/smtpd" + assert syslog["syslog.facility"] == 1 + assert syslog["syslog.priority"] == 13 + assert syslog["syslog.severity_label"] == "Notice" + assert syslog["syslog.facility_label"] == "user-level" + assert len(syslog["source"]) > 0