Skip to content

Commit

Permalink
add one_log_per_packet flag to TCP receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
haimrubinstein committed Jul 6, 2023
1 parent eeae0f9 commit c5ead69
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 42 deletions.
2 changes: 1 addition & 1 deletion .chloggen/add-udp-tokenize-flag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ change_type: enhancement
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add option to skip log tokenization. use the 'one_log_per_packet' setting to skip log tokenization if multiline is not used.
note: Add option to skip log tokenization for both tcp and udp receivers. use the 'one_log_per_packet' setting to skip log tokenization if multiline is not used.

# One or more tracking issues related to the change
issues: [23440]
Expand Down
115 changes: 74 additions & 41 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ package tcp // import "github.com/open-telemetry/opentelemetry-collector-contrib

import (
"bufio"
"bytes"
"context"
"crypto/rand"
"crypto/tls"
"fmt"
"io"
"net"
"strconv"
"sync"
Expand Down Expand Up @@ -48,8 +50,9 @@ func NewConfigWithID(operatorID string) *Config {
return &Config{
InputConfig: helper.NewInputConfig(operatorID, operatorType),
BaseConfig: BaseConfig{
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
OneLogPerPacket: false,
Multiline: helper.NewMultilineConfig(),
Encoding: helper.NewEncodingConfig(),
},
}
}
Expand All @@ -66,6 +69,7 @@ type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
Expand Down Expand Up @@ -114,12 +118,13 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

tcpInput := &Input{
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
encoding: encoding,
splitFunc: splitFunc,
InputOperator: inputOperator,
address: c.ListenAddress,
MaxLogSize: int(c.MaxLogSize),
addAttributes: c.AddAttributes,
OneLogPerPacket: c.OneLogPerPacket,
encoding: encoding,
splitFunc: splitFunc,
backoff: backoff.Backoff{
Max: 3 * time.Second,
},
Expand All @@ -139,9 +144,10 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
// Input is an operator that listens for log entries over tcp.
type Input struct {
helper.InputOperator
address string
MaxLogSize int
addAttributes bool
address string
MaxLogSize int
addAttributes bool
OneLogPerPacket bool

listener net.Listener
cancel context.CancelFunc
Expand Down Expand Up @@ -239,48 +245,75 @@ func (t *Input) goHandleMessages(ctx context.Context, conn net.Conn, cancel cont
defer t.wg.Done()
defer cancel()

if t.OneLogPerPacket {
var buf bytes.Buffer
io.Copy(&buf, conn)
log := truncateMaxLog(buf.Bytes(), t.MaxLogSize)
handleMessage(ctx, conn, t, log)
return
}

buf := make([]byte, 0, t.MaxLogSize)

scanner := bufio.NewScanner(conn)
scanner.Buffer(buf, t.MaxLogSize)

scanner.Split(t.splitFunc)

for scanner.Scan() {
decoded, err := t.encoding.Decode(scanner.Bytes())
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
continue
}
handleMessage(ctx, conn, t, scanner.Bytes())
}

entry, err := t.NewEntry(string(decoded))
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
continue
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))
}
}()
}

if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip))
}
func handleMessage(ctx context.Context, conn net.Conn, t *Input, log []byte) {
decoded, err := t.encoding.Decode(log)
if err != nil {
t.Errorw("Failed to decode data", zap.Error(err))
return
}

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip))
}
}
entry, err := t.NewEntry(string(decoded))
if err != nil {
t.Errorw("Failed to create entry", zap.Error(err))
return
}

t.Write(ctx, entry)
if t.addAttributes {
entry.AddAttribute("net.transport", "IP.TCP")
if addr, ok := conn.RemoteAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.peer.ip", ip)
entry.AddAttribute("net.peer.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.peer.name", t.resolver.GetHostFromIP(ip))
}
if err := scanner.Err(); err != nil {
t.Errorw("Scanner error", zap.Error(err))

if addr, ok := conn.LocalAddr().(*net.TCPAddr); ok {
ip := addr.IP.String()
entry.AddAttribute("net.host.ip", addr.IP.String())
entry.AddAttribute("net.host.port", strconv.FormatInt(int64(addr.Port), 10))
entry.AddAttribute("net.host.name", t.resolver.GetHostFromIP(ip))
}
}()
}

t.Write(ctx, entry)
}

func truncateMaxLog(data []byte, maxLogSize int) (token []byte) {
dataLength := len(data)

if dataLength >= maxLogSize {
return data[:maxLogSize]
}

if dataLength == 0 {
return nil
}
return data
}

// Stop will stop listening for log entries over TCP.
Expand All @@ -301,4 +334,4 @@ func (t *Input) Stop() error {
t.resolver.Stop()
}
return nil
}
}

0 comments on commit c5ead69

Please sign in to comment.