|
1 | 1 | package syslog
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "bytes" |
4 | 5 | "context"
|
5 | 6 | "crypto/tls"
|
6 | 7 | "crypto/x509"
|
@@ -380,16 +381,32 @@ func (t *UDPTransport) handleRcv(c *ConnPipe) {
|
380 | 381 | defer t.openConnections.Done()
|
381 | 382 |
|
382 | 383 | lbs := t.connectionLabels(c.addr.String())
|
383 |
| - err := syslogparser.ParseStream(c, func(result *syslog.Result) { |
384 |
| - if err := result.Error; err != nil { |
385 |
| - t.handleMessageError(err) |
386 |
| - } else { |
387 |
| - t.handleMessage(lbs.Copy(), result.Message) |
| 384 | + |
| 385 | + for { |
| 386 | + datagram := make([]byte, t.maxMessageLength()) |
| 387 | + n, err := c.Read(datagram) |
| 388 | + if err != nil { |
| 389 | + if err == io.EOF { |
| 390 | + break |
| 391 | + } |
| 392 | + |
| 393 | + level.Warn(t.logger).Log("msg", "error reading from pipe", "err", err) |
| 394 | + continue |
388 | 395 | }
|
389 |
| - }, t.maxMessageLength()) |
390 | 396 |
|
391 |
| - if err != nil { |
392 |
| - level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) |
| 397 | + r := bytes.NewReader(datagram[:n]) |
| 398 | + |
| 399 | + err = syslogparser.ParseStream(r, func(result *syslog.Result) { |
| 400 | + if err := result.Error; err != nil { |
| 401 | + t.handleMessageError(err) |
| 402 | + } else { |
| 403 | + t.handleMessage(lbs.Copy(), result.Message) |
| 404 | + } |
| 405 | + }, t.maxMessageLength()) |
| 406 | + |
| 407 | + if err != nil { |
| 408 | + level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err) |
| 409 | + } |
393 | 410 | }
|
394 | 411 | }
|
395 | 412 |
|
|
0 commit comments