Skip to content

Commit

Permalink
feat(common.socket): Allow parallel parsing with a pool of workers (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman authored and asaharn committed Oct 16, 2024
1 parent 4bc32be commit c7a872c
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 101 deletions.
136 changes: 76 additions & 60 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"strings"
"sync"

"github.com/alitto/pond"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
)

Expand All @@ -24,10 +27,19 @@ type packetListener struct {
ReadBufferSize int
Log telegraf.Logger

conn net.PacketConn
decoder internal.ContentDecoder
path string
wg sync.WaitGroup
conn net.PacketConn
decoders sync.Pool
path string
wg sync.WaitGroup
parsePool *pond.WorkerPool
}

func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int) *packetListener {
return &packetListener{
Encoding: encoding,
MaxDecompressionSize: int64(maxDecompressionSize),
parsePool: pond.New(maxWorkers, 0, pond.MinWorkers(maxWorkers/2+1)),
}
}

func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
Expand All @@ -48,15 +60,22 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
d := make([]byte, n)
copy(d, buf[:n])
l.parsePool.Submit(func() {
decoder := l.decoders.Get().(internal.ContentDecoder)
defer l.decoders.Put(decoder)
body, err := decoder.Decode(d)
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
onData(src, body)
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

onData(src, body)
})
}
}()
}
Expand All @@ -80,26 +99,34 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
break
}

// Decode the contents depending on the given encoding
body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
d := make([]byte, n)
copy(d, buf[:n])
l.parsePool.Submit(func() {
// Decode the contents depending on the given encoding
decoder := l.decoders.Get().(internal.ContentDecoder)
// Not possible to immediately return the decoder to the Pool after calling Decode, because some
// decoders return a reference to their internal buffers. This would cause data races.
defer l.decoders.Put(decoder)
body, err := decoder.Decode(d[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// blocks until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// block until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
})
}
}()
}
Expand Down Expand Up @@ -133,18 +160,7 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
}
}

// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder

return nil
return l.setupDecoder()
}

func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) error {
Expand Down Expand Up @@ -179,39 +195,37 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err
l.Log.Warnf("Setting read buffer on %s socket failed: %v", u.Scheme, err)
}
}
l.conn = conn

// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder

return nil
l.conn = conn
return l.setupDecoder()
}

func (l *packetListener) setupIP(u *url.URL) error {
conn, err := net.ListenPacket(u.Scheme, u.Host)
if err != nil {
return fmt.Errorf("listening (ip) failed: %w", err)
}

l.conn = conn
return l.setupDecoder()
}

func (l *packetListener) setupDecoder() error {
// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder

l.decoders = sync.Pool{New: func() any {
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
l.Log.Errorf("creating decoder failed: %v", err)
return nil
}

return decoder
}}

return nil
}
Expand All @@ -237,5 +251,7 @@ func (l *packetListener) close() error {
}
}

l.parsePool.StopAndWait()

return nil
}
59 changes: 20 additions & 39 deletions plugins/common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
SocketMode string `toml:"socket_mode"`
ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
MaxParallelParsers int `toml:"max_parallel_parsers"`
common_tls.ServerConfig
}

Expand Down Expand Up @@ -96,74 +97,54 @@ func (cfg *Config) NewSocket(address string, splitcfg *SplitConfig, logger teleg
}

func (s *Socket) Setup() error {
s.MaxParallelParsers = max(s.MaxParallelParsers, 1)
switch s.url.Scheme {
case "tcp", "tcp4", "tcp6":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
s.Config,
s.splitter,
s.log,
)

if err := l.setupTCP(s.url, s.tlsCfg); err != nil {
return err
}
s.listener = l
case "unix", "unixpacket":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
s.Config,
s.splitter,
s.log,
)

if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil {
return err
}
s.listener = l
case "udp", "udp4", "udp6":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
return err
}
s.listener = l
case "ip", "ip4", "ip6":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupIP(s.url); err != nil {
return err
}
s.listener = l
case "unixgram":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil {
return err
}
s.listener = l
case "vsock":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
s.Config,
s.splitter,
s.log,
)

if err := l.setupVsock(s.url); err != nil {
return err
Expand Down
34 changes: 32 additions & 2 deletions plugins/common/socket/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"syscall"
"time"

"github.com/alitto/pond"
"github.com/mdlayher/vsock"

"github.com/influxdata/telegraf"
Expand All @@ -43,11 +44,29 @@ type streamListener struct {
connections uint64
path string
cancel context.CancelFunc
parsePool *pond.WorkerPool

wg sync.WaitGroup
sync.Mutex
}

func newStreamListener(conf Config, splitter bufio.SplitFunc, log telegraf.Logger) *streamListener {
return &streamListener{
ReadBufferSize: int(conf.ReadBufferSize),
ReadTimeout: conf.ReadTimeout,
KeepAlivePeriod: conf.KeepAlivePeriod,
MaxConnections: conf.MaxConnections,
Encoding: conf.ContentEncoding,
Splitter: splitter,
Log: log,

parsePool: pond.New(
conf.MaxParallelParsers,
0,
pond.MinWorkers(conf.MaxParallelParsers/2+1)),
}
}

func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error {
var err error
if tlsCfg == nil {
Expand Down Expand Up @@ -216,6 +235,9 @@ func (l *streamListener) close() error {
return err
}
}

l.parsePool.StopAndWait()

return nil
}

Expand Down Expand Up @@ -334,8 +356,13 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unix"}
}

data := scanner.Bytes()
onData(src, data)
d := make([]byte, len(data))
copy(d, data)
l.parsePool.Submit(func() {
onData(src, d)
})
}

if err := scanner.Err(); err != nil {
Expand Down Expand Up @@ -379,7 +406,10 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
if err != nil {
return fmt.Errorf("read on %s failed: %w", src, err)
}
onData(src, buf)

l.parsePool.Submit(func() {
onData(src, buf)
})

return nil
}
Expand Down

0 comments on commit c7a872c

Please sign in to comment.