Skip to content

Commit 0320106

Browse files
committed
promtail: add support for RFC3164 syslogs
This adds support for RFC3164 syslogs which are used by many devices, in my use case OpenWrt and Ubiquiti routers. A new option called `is_rfc3164_message` is added to the syslog which makes the incomming logs to be handled as RFC3164. Co-Authored-By: Paul Spooren <[email protected]> Signed-off-by: Kirill A. Korinsky <[email protected]>
1 parent 1a2ebf7 commit 0320106

File tree

6 files changed

+85
-14
lines changed

6 files changed

+85
-14
lines changed

clients/pkg/promtail/scrapeconfig/scrapeconfig.go

+3
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,9 @@ type SyslogTargetConfig struct {
202202
// message should be pushed to Loki
203203
UseRFC5424Message bool `yaml:"use_rfc5424_message"`
204204

205+
// IsRFC3164Message defines wether the log is formated as RFC3164
206+
IsRFC3164Message bool `yaml:"is_rfc3164_message"`
207+
205208
// MaxMessageLength sets the maximum limit to the length of syslog messages
206209
MaxMessageLength int `yaml:"max_message_length"`
207210

clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// the callback function with the parsed messages. The parser automatically
1515
// detects octet counting.
1616
// The function returns on EOF or unrecoverable errors.
17-
func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
17+
func ParseStream(isRFC3164Message bool, r io.Reader, callback func(res *syslog.Result), maxMessageLength int) error {
1818
buf := bufio.NewReaderSize(r, 1<<10)
1919

2020
b, err := buf.ReadByte()
@@ -24,9 +24,17 @@ func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLengt
2424
_ = buf.UnreadByte()
2525

2626
if b == '<' {
27-
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
27+
if isRFC3164Message {
28+
nontransparent.NewParserRFC3164(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
29+
} else {
30+
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
31+
}
2832
} else if b >= '0' && b <= '9' {
29-
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
33+
if isRFC3164Message {
34+
octetcounting.NewParserRFC3164(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
35+
} else {
36+
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
37+
}
3038
} else {
3139
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", string(b))
3240
}

clients/pkg/promtail/targets/syslog/syslogparser/syslogparser_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestParseStream_OctetCounting(t *testing.T) {
2424
results = append(results, res)
2525
}
2626

27-
err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
27+
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
2828
require.NoError(t, err)
2929

3030
require.Equal(t, 2, len(results))
@@ -43,7 +43,7 @@ func TestParseStream_ValidParseError(t *testing.T) {
4343
results = append(results, res)
4444
}
4545

46-
err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
46+
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
4747
require.NoError(t, err)
4848

4949
require.Equal(t, 1, len(results))
@@ -59,7 +59,7 @@ func TestParseStream_OctetCounting_LongMessage(t *testing.T) {
5959
results = append(results, res)
6060
}
6161

62-
err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
62+
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
6363
require.NoError(t, err)
6464

6565
require.Equal(t, 1, len(results))
@@ -74,7 +74,7 @@ func TestParseStream_NewlineSeparated(t *testing.T) {
7474
results = append(results, res)
7575
}
7676

77-
err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
77+
err := syslogparser.ParseStream(false, r, cb, defaultMaxMessageLength)
7878
require.NoError(t, err)
7979

8080
require.Equal(t, 2, len(results))
@@ -87,13 +87,13 @@ func TestParseStream_NewlineSeparated(t *testing.T) {
8787
func TestParseStream_InvalidStream(t *testing.T) {
8888
r := strings.NewReader("invalid")
8989

90-
err := syslogparser.ParseStream(r, func(res *syslog.Result) {}, defaultMaxMessageLength)
90+
err := syslogparser.ParseStream(false, r, func(res *syslog.Result) {}, defaultMaxMessageLength)
9191
require.EqualError(t, err, "invalid or unsupported framing. first byte: 'i'")
9292
}
9393

9494
func TestParseStream_EmptyStream(t *testing.T) {
9595
r := strings.NewReader("")
9696

97-
err := syslogparser.ParseStream(r, func(res *syslog.Result) {}, defaultMaxMessageLength)
97+
err := syslogparser.ParseStream(false, r, func(res *syslog.Result) {}, defaultMaxMessageLength)
9898
require.Equal(t, err, io.EOF)
9999
}

clients/pkg/promtail/targets/syslog/syslogtarget.go

+60-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/go-kit/log/level"
1212
"github.com/leodido/go-syslog/v4"
1313
"github.com/leodido/go-syslog/v4/rfc3164"
14+
"github.com/leodido/go-syslog/v4/rfc5424"
1415
"github.com/prometheus/common/model"
1516
"github.com/prometheus/prometheus/model/labels"
1617
"github.com/prometheus/prometheus/model/relabel"
@@ -106,7 +107,7 @@ func (t *SyslogTarget) handleMessageError(err error) {
106107
t.metrics.syslogParsingErrors.Inc()
107108
}
108109

109-
func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {
110+
func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg syslog.Message) {
110111
rfc5424Msg := msg.(*rfc5424.SyslogMessage)
111112

112113
if rfc5424Msg.Message == nil {
@@ -173,6 +174,64 @@ func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Messag
173174
t.messages <- message{filtered, m, timestamp}
174175
}
175176

177+
func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg syslog.Message) {
178+
rfc3164Msg := msg.(*rfc3164.SyslogMessage)
179+
180+
if rfc3164Msg.Message == nil {
181+
t.metrics.syslogEmptyMessages.Inc()
182+
return
183+
}
184+
185+
lb := labels.NewBuilder(connLabels)
186+
if v := rfc3164Msg.SeverityLevel(); v != nil {
187+
lb.Set("__syslog_message_severity", *v)
188+
}
189+
if v := rfc3164Msg.FacilityLevel(); v != nil {
190+
lb.Set("__syslog_message_facility", *v)
191+
}
192+
if v := rfc3164Msg.Hostname; v != nil {
193+
lb.Set("__syslog_message_hostname", *v)
194+
}
195+
if v := rfc3164Msg.Appname; v != nil {
196+
lb.Set("__syslog_message_app_name", *v)
197+
}
198+
if v := rfc3164Msg.ProcID; v != nil {
199+
lb.Set("__syslog_message_proc_id", *v)
200+
}
201+
if v := rfc3164Msg.MsgID; v != nil {
202+
lb.Set("__syslog_message_msg_id", *v)
203+
}
204+
205+
processed, _ := relabel.Process(lb.Labels(), t.relabelConfig...)
206+
207+
filtered := make(model.LabelSet)
208+
for _, lbl := range processed {
209+
if strings.HasPrefix(lbl.Name, "__") {
210+
continue
211+
}
212+
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
213+
}
214+
215+
var timestamp time.Time
216+
if t.config.UseIncomingTimestamp && rfc3164Msg.Timestamp != nil {
217+
timestamp = *rfc3164Msg.Timestamp
218+
} else {
219+
timestamp = time.Now()
220+
}
221+
222+
m := *rfc3164Msg.Message
223+
224+
t.messages <- message{filtered, m, timestamp}
225+
}
226+
227+
func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {
228+
if t.config.IsRFC3164Message {
229+
t.handleMessageRFC3164(connLabels, msg)
230+
} else {
231+
t.handleMessageRFC5424(connLabels, msg)
232+
}
233+
}
234+
176235
func (t *SyslogTarget) messageSender(entries chan<- api.Entry) {
177236
for msg := range t.messages {
178237
entries <- api.Entry{

clients/pkg/promtail/targets/syslog/syslogtarget_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -916,7 +916,7 @@ func TestParseStream_WithAsyncPipe(t *testing.T) {
916916
results = append(results, res)
917917
}
918918

919-
err := syslogparser.ParseStream(pipe, cb, defaultMaxMessageLength)
919+
err := syslogparser.ParseStream(false, pipe, cb, defaultMaxMessageLength)
920920
require.NoError(t, err)
921921
require.Equal(t, 3, len(results))
922922
}

clients/pkg/promtail/targets/syslog/transport.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func (t *TCPTransport) handleConnection(cn net.Conn) {
272272

273273
lbs := t.connectionLabels(ipFromConn(c).String())
274274

275-
err := syslogparser.ParseStream(c, func(result *syslog.Result) {
275+
err := syslogparser.ParseStream(t.config.IsRFC3164Message, c, func(result *syslog.Result) {
276276
if err := result.Error; err != nil {
277277
t.handleMessageError(err)
278278
return
@@ -380,7 +380,8 @@ func (t *UDPTransport) acceptPackets() {
380380
func (t *UDPTransport) handleRcv(c *ConnPipe) {
381381
defer t.openConnections.Done()
382382

383-
lbs := t.connectionLabels(c.addr.String())
383+
udpAddr, _ := net.ResolveUDPAddr("udp", c.addr.String())
384+
lbs := t.connectionLabels(udpAddr.IP.String())
384385

385386
for {
386387
datagram := make([]byte, t.maxMessageLength())
@@ -396,7 +397,7 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) {
396397

397398
r := bytes.NewReader(datagram[:n])
398399

399-
err = syslogparser.ParseStream(r, func(result *syslog.Result) {
400+
err = syslogparser.ParseStream(t.config.IsRFC3164Message, r, func(result *syslog.Result) {
400401
if err := result.Error; err != nil {
401402
t.handleMessageError(err)
402403
} else {

0 commit comments

Comments
 (0)